Adding tool for automated cluster-merger
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
46                                   action="store", type="int",
47                                   dest="pause_period",
48                                   help=("Amount of time in seconds watcher"
49                                         " should be suspended from running"))
50
51
52 def Flatten(unflatten_list):
53   """Flattens a list.
54
55   @param unflatten_list: A list of unflatten list objects.
56   @return: A flatten list
57
58   """
59   flatten_list = []
60
61   for item in unflatten_list:
62     if isinstance(item, list):
63       flatten_list.extend(Flatten(item))
64     else:
65       flatten_list.append(item)
66   return flatten_list
67
68
69 class MergerData(object):
70   """Container class to hold data used for merger.
71
72   """
73   def __init__(self, cluster, key_path, nodes, instances, config_path=None):
74     """Initialize the container.
75
76     @param cluster: The name of the cluster
77     @param key_path: Path to the ssh private key used for authentication
78     @param config_path: Path to the merging cluster config
79     @param nodes: List of nodes in the merging cluster
80     @param instances: List of instances running on merging cluster
81
82     """
83     self.cluster = cluster
84     self.key_path = key_path
85     self.config_path = config_path
86     self.instances = instances
87     self.nodes = nodes
88
89
90 class Merger(object):
91   """Handling the merge.
92
93   """
94   def __init__(self, clusters, pause_period):
95     """Initialize object with sane defaults and infos required.
96
97     @param clusters: The list of clusters to merge in
98     @param pause_period: The time watcher shall be disabled for
99
100     """
101     self.merger_data = []
102     self.clusters = clusters
103     self.pause_period = pause_period
104     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
105     self.cluster_name = cli.GetClient().QueryConfigValues(["cluster_name"])
106     self.ssh_runner = ssh.SshRunner(self.cluster_name)
107
108   def Setup(self):
109     """Sets up our end so we can do the merger.
110
111     This method is setting us up as a preparation for the merger.
112     It makes the initial contact and gathers information needed.
113
114     @raise errors.RemoteError: for errors in communication/grabbing
115
116     """
117     (remote_path, _, _) = ssh.GetUserFiles("root")
118
119     # Fetch remotes private key
120     for cluster in self.clusters:
121       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
122                             ask_key=False)
123       if result.failed:
124         raise errors.RemoteError("There was an error while grabbing ssh private"
125                                  " key from %s. Fail reason: %s; output: %s" %
126                                  (cluster, result.fail_reason, result.output))
127
128       key_path = os.path.join(self.work_dir, cluster)
129       utils.WriteFile(key_path, mode=0600, data=result.stdout)
130
131       result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
132                             private_key=key_path)
133       if result.failed:
134         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
135                                  " Fail reason: %s; output: %s" %
136                                  (cluster, result.fail_reason, result.output))
137       nodes = result.stdout.splitlines()
138
139       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
140                             private_key=key_path)
141       if result.failed:
142         raise errors.RemoteError("Unable to retrieve list of instances from"
143                                  " %s. Fail reason: %s; output: %s" %
144                                  (cluster, result.fail_reason, result.output))
145       instances = result.stdout.splitlines()
146
147       self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
148
149   def _PrepareAuthorizedKeys(self):
150     """Prepare the authorized_keys on every merging node.
151
152     This method add our public key to remotes authorized_key for further
153     communication.
154
155     """
156     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
157     pub_key = utils.ReadFile(pub_key_file)
158
159     for data in self.merger_data:
160       for node in data.nodes:
161         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
162                                      (auth_keys, pub_key)),
163                               private_key=data.key_path)
164
165         if result.failed:
166           raise errors.RemoteError("Unable to add our public key to %s in %s."
167                                    " Fail reason: %s; output: %s" %
168                                    (node, data.cluster, result.fail_reason,
169                                     result.output))
170
171   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
172               strict_host_check=False, private_key=None, batch=True,
173               ask_key=False):
174     """Wrapping SshRunner.Run with default parameters.
175
176     For explanation of parameters see L{ssh.SshRunner.Run}.
177
178     """
179     return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
180                                use_cluster_key=use_cluster_key,
181                                strict_host_check=strict_host_check,
182                                private_key=private_key, batch=batch,
183                                ask_key=ask_key)
184
185   def _StopMergingInstances(self):
186     """Stop instances on merging clusters.
187
188     """
189     for cluster in self.clusters:
190       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
191                                      " --force-multiple")
192
193       if result.failed:
194         raise errors.RemoteError("Unable to stop instances on %s."
195                                  " Fail reason: %s; output: %s" %
196                                  (cluster, result.fail_reason, result.output))
197
198   def _DisableWatcher(self):
199     """Disable watch on all merging clusters, including ourself.
200
201     """
202     for cluster in ["localhost"] + self.clusters:
203       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
204                                      self.pause_period)
205
206       if result.failed:
207         raise errors.RemoteError("Unable to pause watcher on %s."
208                                  " Fail reason: %s; output: %s" %
209                                  (cluster, result.fail_reason, result.output))
210
211
212   # R0201: Method could be a function
213   def _EnableWatcher(self): # pylint: disable-msg=R0201
214     """Reenable watcher (locally).
215
216     """
217     result = utils.RunCmd(["gnt-cluster", "watcher", "continue"])
218
219     if result.failed:
220       logging.warning("Unable to continue watcher. Fail reason: %s;"
221                       " output: %s" % (result.fail_reason,
222                                        result.output))
223
224   def _StopDaemons(self):
225     """Stop all daemons on merging nodes.
226
227     """
228     # FIXME: Worth to put this into constants?
229     cmds = []
230     for daemon in (constants.RAPI, constants.MASTERD,
231                    constants.NODED, constants.CONFD):
232       cmds.append("%s stop %s" % (constants.DAEMON_UTIL, daemon))
233     for data in self.merger_data:
234       for node in data.nodes:
235         result = self._RunCmd(node, " && ".join(cmds))
236
237         if result.failed:
238           raise errors.RemoteError("Unable to stop daemons on %s."
239                                    " Fail reason: %s; output: %s." %
240                                    (node, result.fail_reason, result.output))
241
242   def _FetchRemoteConfig(self):
243     """Fetches and stores remote cluster config from the master.
244
245     This step is needed before we can merge the config.
246
247     """
248     for data in self.merger_data:
249       result = self._RunCmd(data.cluster, "cat %s" %
250                                           constants.CLUSTER_CONF_FILE)
251
252       if result.failed:
253         raise errors.RemoteError("Unable to retrieve remote config on %s."
254                                  " Fail reason: %s; output %s" %
255                                  (data.cluster, result.fail_reason,
256                                   result.output))
257
258       data.config_path = os.path.join(self.work_dir, "%s_config.data" %
259                                                      data.cluster)
260       utils.WriteFile(data.config_path, data=result.stdout)
261
262   # R0201: Method could be a function
263   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
264     """Kills the local master daemon.
265
266     @raise errors.CommandError: If unable to kill
267
268     """
269     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
270     if result.failed:
271       raise errors.CommandError("Unable to stop master daemons."
272                                 " Fail reason: %s; output: %s" %
273                                 (result.fail_reason, result.output))
274
275   def _MergeConfig(self):
276     """Merges all foreign config into our own config.
277
278     """
279     my_config = config.ConfigWriter(offline=True)
280     fake_ec_id = 0 # Needs to be uniq over the whole config merge
281
282     for data in self.merger_data:
283       other_config = config.ConfigWriter(data.config_path)
284
285       for node in other_config.GetNodeList():
286         node_info = other_config.GetNodeInfo(node)
287         node_info.master_candidate = False
288         my_config.AddNode(node_info, str(fake_ec_id))
289         fake_ec_id += 1
290
291       for instance in other_config.GetInstanceList():
292         instance_info = other_config.GetInstanceInfo(instance)
293
294         # Update the DRBD port assignments
295         # This is a little bit hackish
296         for dsk in instance_info.disks:
297           if dsk.dev_type in constants.LDS_DRBD:
298             port = my_config.AllocatePort()
299
300             logical_id = list(dsk.logical_id)
301             logical_id[2] = port
302             dsk.logical_id = tuple(logical_id)
303
304             physical_id = list(dsk.physical_id)
305             physical_id[1] = physical_id[3] = port
306             dsk.physical_id = tuple(physical_id)
307
308         my_config.AddInstance(instance_info, str(fake_ec_id))
309         fake_ec_id += 1
310
311   # R0201: Method could be a function
312   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
313     """Starts the local master daemon.
314
315     @param no_vote: Should the masterd started without voting? default: False
316     @raise errors.CommandError: If unable to start daemon.
317
318     """
319     env = {}
320     if no_vote:
321       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
322
323     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
324     if result.failed:
325       raise errors.CommandError("Couldn't start ganeti master."
326                                 " Fail reason: %s; output: %s" %
327                                 (result.fail_reason, result.output))
328
329   def _ReaddMergedNodesAndRedist(self):
330     """Readds all merging nodes and make sure their config is up-to-date.
331
332     @raise errors.CommandError: If anything fails.
333
334     """
335     for data in self.merger_data:
336       for node in data.nodes:
337         result = utils.RunCmd(["gnt-node", "add", "--readd",
338                                "--no-ssh-key-check", node])
339         if result.failed:
340           raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
341                                     " output: %s" % (node, result.fail_reason,
342                                                      result.output))
343
344     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
345     if result.failed:
346       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
347                                 " output: %s" % (result.fail_reason,
348                                                 result.output))
349
350   # R0201: Method could be a function
351   def _StartupAllInstances(self): # pylint: disable-msg=R0201
352     """Starts up all instances (locally).
353
354     @raise errors.CommandError: If unable to start clusters
355
356     """
357     result = utils.RunCmd(["gnt-instance", "startup", "--all",
358                            "--force-multiple"])
359     if result.failed:
360       raise errors.CommandError("Unable to start all instances."
361                                 " Fail reason: %s; output: %s" %
362                                 (result.fail_reason, result.output))
363
364   # R0201: Method could be a function
365   def _VerifyCluster(self): # pylint: disable-msg=R0201
366     """Runs gnt-cluster verify to verify the health.
367
368     @raise errors.ProgrammError: If cluster fails on verification
369
370     """
371     result = utils.RunCmd(["gnt-cluster", "verify"])
372     if result.failed:
373       raise errors.CommandError("Verification of cluster failed."
374                                 " Fail reason: %s; output: %s" %
375                                 (result.fail_reason, result.output))
376
377   def Merge(self):
378     """Does the actual merge.
379
380     It runs all the steps in the right order and updates the user about steps
381     taken. Also it keeps track of rollback_steps to undo everything.
382
383     """
384     rbsteps = []
385     try:
386       logging.info("Pre cluster verification")
387       self._VerifyCluster()
388
389       logging.info("Prepare authorized_keys")
390       rbsteps.append("Remove our key from authorized_keys on nodes:"
391                      " %(nodes)s")
392       self._PrepareAuthorizedKeys()
393
394       rbsteps.append("Start all instances again on the merging"
395                      " clusters: %(clusters)s")
396       logging.info("Stopping merging instances (takes a while)")
397       self._StopMergingInstances()
398
399       logging.info("Disable watcher")
400       self._DisableWatcher()
401       logging.info("Stop daemons on merging nodes")
402       self._StopDaemons()
403       logging.info("Merging config")
404       self._FetchRemoteConfig()
405       self._KillMasterDaemon()
406
407       rbsteps.append("Restore %s from another master candidate" %
408                      constants.CLUSTER_CONF_FILE)
409       self._MergeConfig()
410       self._StartMasterDaemon(no_vote=True)
411
412       # Point of no return, delete rbsteps
413       del rbsteps[:]
414
415       logging.warning("We are at the point of no return. Merge can not easily"
416                       " be undone after this point.")
417       logging.info("Readd nodes and redistribute config")
418       self._ReaddMergedNodesAndRedist()
419       self._KillMasterDaemon()
420       self._StartMasterDaemon()
421       logging.info("Starting instances again")
422       self._StartupAllInstances()
423       logging.info("Post cluster verification")
424       self._VerifyCluster()
425     except errors.GenericError, e:
426       logging.exception(e)
427
428       if rbsteps:
429         nodes = Flatten([data.nodes for data in self.merger_data])
430         info = {
431           "clusters": self.clusters,
432           "nodes": nodes,
433           }
434         logging.critical("In order to rollback do the following:")
435         for step in rbsteps:
436           logging.critical("  * %s" % (step % info))
437       else:
438         logging.critical("Nothing to rollback.")
439
440       # TODO: Keep track of steps done for a flawless resume?
441
442   def Cleanup(self):
443     """Clean up our environment.
444
445     This cleans up remote private keys and configs and after that
446     deletes the temporary directory.
447
448     """
449     shutil.rmtree(self.work_dir)
450
451
452 def SetupLogging(options):
453   """Setting up logging infrastructure.
454
455   @param options: Parsed command line options
456
457   """
458   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
459
460   stderr_handler = logging.StreamHandler()
461   stderr_handler.setFormatter(formatter)
462   if options.debug:
463     stderr_handler.setLevel(logging.NOTSET)
464   elif options.verbose:
465     stderr_handler.setLevel(logging.INFO)
466   else:
467     stderr_handler.setLevel(logging.ERROR)
468
469   root_logger = logging.getLogger("")
470   root_logger.setLevel(logging.NOTSET)
471   root_logger.addHandler(stderr_handler)
472
473
474 def main():
475   """Main routine.
476
477   """
478   program = os.path.basename(sys.argv[0])
479
480   parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
481                                         " [--watcher-pause-period SECONDS]"
482                                         " <cluster> <cluster...>"),
483                                         prog=program)
484   parser.add_option(cli.DEBUG_OPT)
485   parser.add_option(cli.VERBOSE_OPT)
486   parser.add_option(PAUSE_PERIOD_OPT)
487
488   (options, args) = parser.parse_args()
489
490   SetupLogging(options)
491
492   if not args:
493     parser.error("No clusters specified")
494
495   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period)
496   try:
497     try:
498       cluster_merger.Setup()
499       cluster_merger.Merge()
500     except errors.GenericError, e:
501       logging.exception(e)
502       return constants.EXIT_FAILURE
503   finally:
504     cluster_merger.Cleanup()
505
506   return constants.EXIT_SUCCESS
507
508
509 if __name__ == "__main__":
510   sys.exit(main())