Switch from os.path.join to utils.PathJoin
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
46                                   action="store", type="int",
47                                   dest="pause_period",
48                                   help=("Amount of time in seconds watcher"
49                                         " should be suspended from running"))
50
51
52 def Flatten(unflatten_list):
53   """Flattens a list.
54
55   @param unflatten_list: A list of unflatten list objects.
56   @return: A flatten list
57
58   """
59   flatten_list = []
60
61   for item in unflatten_list:
62     if isinstance(item, list):
63       flatten_list.extend(Flatten(item))
64     else:
65       flatten_list.append(item)
66   return flatten_list
67
68
69 class MergerData(object):
70   """Container class to hold data used for merger.
71
72   """
73   def __init__(self, cluster, key_path, nodes, instances, config_path=None):
74     """Initialize the container.
75
76     @param cluster: The name of the cluster
77     @param key_path: Path to the ssh private key used for authentication
78     @param config_path: Path to the merging cluster config
79     @param nodes: List of nodes in the merging cluster
80     @param instances: List of instances running on merging cluster
81
82     """
83     self.cluster = cluster
84     self.key_path = key_path
85     self.config_path = config_path
86     self.instances = instances
87     self.nodes = nodes
88
89
90 class Merger(object):
91   """Handling the merge.
92
93   """
94   def __init__(self, clusters, pause_period):
95     """Initialize object with sane defaults and infos required.
96
97     @param clusters: The list of clusters to merge in
98     @param pause_period: The time watcher shall be disabled for
99
100     """
101     self.merger_data = []
102     self.clusters = clusters
103     self.pause_period = pause_period
104     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
105     self.cluster_name = cli.GetClient().QueryConfigValues(["cluster_name"])
106     self.ssh_runner = ssh.SshRunner(self.cluster_name)
107
108   def Setup(self):
109     """Sets up our end so we can do the merger.
110
111     This method is setting us up as a preparation for the merger.
112     It makes the initial contact and gathers information needed.
113
114     @raise errors.RemoteError: for errors in communication/grabbing
115
116     """
117     (remote_path, _, _) = ssh.GetUserFiles("root")
118
119     # Fetch remotes private key
120     for cluster in self.clusters:
121       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
122                             ask_key=False)
123       if result.failed:
124         raise errors.RemoteError("There was an error while grabbing ssh private"
125                                  " key from %s. Fail reason: %s; output: %s" %
126                                  (cluster, result.fail_reason, result.output))
127
128       key_path = utils.PathJoin(self.work_dir, cluster)
129       utils.WriteFile(key_path, mode=0600, data=result.stdout)
130
131       result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
132                             private_key=key_path)
133       if result.failed:
134         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
135                                  " Fail reason: %s; output: %s" %
136                                  (cluster, result.fail_reason, result.output))
137       nodes = result.stdout.splitlines()
138
139       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
140                             private_key=key_path)
141       if result.failed:
142         raise errors.RemoteError("Unable to retrieve list of instances from"
143                                  " %s. Fail reason: %s; output: %s" %
144                                  (cluster, result.fail_reason, result.output))
145       instances = result.stdout.splitlines()
146
147       self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
148
149   def _PrepareAuthorizedKeys(self):
150     """Prepare the authorized_keys on every merging node.
151
152     This method add our public key to remotes authorized_key for further
153     communication.
154
155     """
156     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
157     pub_key = utils.ReadFile(pub_key_file)
158
159     for data in self.merger_data:
160       for node in data.nodes:
161         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
162                                      (auth_keys, pub_key)),
163                               private_key=data.key_path)
164
165         if result.failed:
166           raise errors.RemoteError("Unable to add our public key to %s in %s."
167                                    " Fail reason: %s; output: %s" %
168                                    (node, data.cluster, result.fail_reason,
169                                     result.output))
170
171   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
172               strict_host_check=False, private_key=None, batch=True,
173               ask_key=False):
174     """Wrapping SshRunner.Run with default parameters.
175
176     For explanation of parameters see L{ssh.SshRunner.Run}.
177
178     """
179     return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
180                                use_cluster_key=use_cluster_key,
181                                strict_host_check=strict_host_check,
182                                private_key=private_key, batch=batch,
183                                ask_key=ask_key)
184
185   def _StopMergingInstances(self):
186     """Stop instances on merging clusters.
187
188     """
189     for cluster in self.clusters:
190       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
191                                      " --force-multiple")
192
193       if result.failed:
194         raise errors.RemoteError("Unable to stop instances on %s."
195                                  " Fail reason: %s; output: %s" %
196                                  (cluster, result.fail_reason, result.output))
197
198   def _DisableWatcher(self):
199     """Disable watch on all merging clusters, including ourself.
200
201     """
202     for cluster in ["localhost"] + self.clusters:
203       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
204                                      self.pause_period)
205
206       if result.failed:
207         raise errors.RemoteError("Unable to pause watcher on %s."
208                                  " Fail reason: %s; output: %s" %
209                                  (cluster, result.fail_reason, result.output))
210
211
212   # R0201: Method could be a function
213   def _EnableWatcher(self): # pylint: disable-msg=R0201
214     """Reenable watcher (locally).
215
216     """
217     result = utils.RunCmd(["gnt-cluster", "watcher", "continue"])
218
219     if result.failed:
220       logging.warning("Unable to continue watcher. Fail reason: %s;"
221                       " output: %s", result.fail_reason, result.output)
222
223   def _StopDaemons(self):
224     """Stop all daemons on merging nodes.
225
226     """
227     # FIXME: Worth to put this into constants?
228     cmds = []
229     for daemon in (constants.RAPI, constants.MASTERD,
230                    constants.NODED, constants.CONFD):
231       cmds.append("%s stop %s" % (constants.DAEMON_UTIL, daemon))
232     for data in self.merger_data:
233       for node in data.nodes:
234         result = self._RunCmd(node, " && ".join(cmds))
235
236         if result.failed:
237           raise errors.RemoteError("Unable to stop daemons on %s."
238                                    " Fail reason: %s; output: %s." %
239                                    (node, result.fail_reason, result.output))
240
241   def _FetchRemoteConfig(self):
242     """Fetches and stores remote cluster config from the master.
243
244     This step is needed before we can merge the config.
245
246     """
247     for data in self.merger_data:
248       result = self._RunCmd(data.cluster, "cat %s" %
249                                           constants.CLUSTER_CONF_FILE)
250
251       if result.failed:
252         raise errors.RemoteError("Unable to retrieve remote config on %s."
253                                  " Fail reason: %s; output %s" %
254                                  (data.cluster, result.fail_reason,
255                                   result.output))
256
257       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
258                                         data.cluster)
259       utils.WriteFile(data.config_path, data=result.stdout)
260
261   # R0201: Method could be a function
262   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
263     """Kills the local master daemon.
264
265     @raise errors.CommandError: If unable to kill
266
267     """
268     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
269     if result.failed:
270       raise errors.CommandError("Unable to stop master daemons."
271                                 " Fail reason: %s; output: %s" %
272                                 (result.fail_reason, result.output))
273
274   def _MergeConfig(self):
275     """Merges all foreign config into our own config.
276
277     """
278     my_config = config.ConfigWriter(offline=True)
279     fake_ec_id = 0 # Needs to be uniq over the whole config merge
280
281     for data in self.merger_data:
282       other_config = config.ConfigWriter(data.config_path)
283
284       for node in other_config.GetNodeList():
285         node_info = other_config.GetNodeInfo(node)
286         node_info.master_candidate = False
287         my_config.AddNode(node_info, str(fake_ec_id))
288         fake_ec_id += 1
289
290       for instance in other_config.GetInstanceList():
291         instance_info = other_config.GetInstanceInfo(instance)
292
293         # Update the DRBD port assignments
294         # This is a little bit hackish
295         for dsk in instance_info.disks:
296           if dsk.dev_type in constants.LDS_DRBD:
297             port = my_config.AllocatePort()
298
299             logical_id = list(dsk.logical_id)
300             logical_id[2] = port
301             dsk.logical_id = tuple(logical_id)
302
303             physical_id = list(dsk.physical_id)
304             physical_id[1] = physical_id[3] = port
305             dsk.physical_id = tuple(physical_id)
306
307         my_config.AddInstance(instance_info, str(fake_ec_id))
308         fake_ec_id += 1
309
310   # R0201: Method could be a function
311   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
312     """Starts the local master daemon.
313
314     @param no_vote: Should the masterd started without voting? default: False
315     @raise errors.CommandError: If unable to start daemon.
316
317     """
318     env = {}
319     if no_vote:
320       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
321
322     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
323     if result.failed:
324       raise errors.CommandError("Couldn't start ganeti master."
325                                 " Fail reason: %s; output: %s" %
326                                 (result.fail_reason, result.output))
327
328   def _ReaddMergedNodesAndRedist(self):
329     """Readds all merging nodes and make sure their config is up-to-date.
330
331     @raise errors.CommandError: If anything fails.
332
333     """
334     for data in self.merger_data:
335       for node in data.nodes:
336         result = utils.RunCmd(["gnt-node", "add", "--readd",
337                                "--no-ssh-key-check", node])
338         if result.failed:
339           raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
340                                     " output: %s" % (node, result.fail_reason,
341                                                      result.output))
342
343     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
344     if result.failed:
345       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
346                                 " output: %s" % (result.fail_reason,
347                                                 result.output))
348
349   # R0201: Method could be a function
350   def _StartupAllInstances(self): # pylint: disable-msg=R0201
351     """Starts up all instances (locally).
352
353     @raise errors.CommandError: If unable to start clusters
354
355     """
356     result = utils.RunCmd(["gnt-instance", "startup", "--all",
357                            "--force-multiple"])
358     if result.failed:
359       raise errors.CommandError("Unable to start all instances."
360                                 " Fail reason: %s; output: %s" %
361                                 (result.fail_reason, result.output))
362
363   # R0201: Method could be a function
364   def _VerifyCluster(self): # pylint: disable-msg=R0201
365     """Runs gnt-cluster verify to verify the health.
366
367     @raise errors.ProgrammError: If cluster fails on verification
368
369     """
370     result = utils.RunCmd(["gnt-cluster", "verify"])
371     if result.failed:
372       raise errors.CommandError("Verification of cluster failed."
373                                 " Fail reason: %s; output: %s" %
374                                 (result.fail_reason, result.output))
375
376   def Merge(self):
377     """Does the actual merge.
378
379     It runs all the steps in the right order and updates the user about steps
380     taken. Also it keeps track of rollback_steps to undo everything.
381
382     """
383     rbsteps = []
384     try:
385       logging.info("Pre cluster verification")
386       self._VerifyCluster()
387
388       logging.info("Prepare authorized_keys")
389       rbsteps.append("Remove our key from authorized_keys on nodes:"
390                      " %(nodes)s")
391       self._PrepareAuthorizedKeys()
392
393       rbsteps.append("Start all instances again on the merging"
394                      " clusters: %(clusters)s")
395       logging.info("Stopping merging instances (takes a while)")
396       self._StopMergingInstances()
397
398       logging.info("Disable watcher")
399       self._DisableWatcher()
400       logging.info("Stop daemons on merging nodes")
401       self._StopDaemons()
402       logging.info("Merging config")
403       self._FetchRemoteConfig()
404       self._KillMasterDaemon()
405
406       rbsteps.append("Restore %s from another master candidate" %
407                      constants.CLUSTER_CONF_FILE)
408       self._MergeConfig()
409       self._StartMasterDaemon(no_vote=True)
410
411       # Point of no return, delete rbsteps
412       del rbsteps[:]
413
414       logging.warning("We are at the point of no return. Merge can not easily"
415                       " be undone after this point.")
416       logging.info("Readd nodes and redistribute config")
417       self._ReaddMergedNodesAndRedist()
418       self._KillMasterDaemon()
419       self._StartMasterDaemon()
420       logging.info("Starting instances again")
421       self._StartupAllInstances()
422       logging.info("Post cluster verification")
423       self._VerifyCluster()
424     except errors.GenericError, e:
425       logging.exception(e)
426
427       if rbsteps:
428         nodes = Flatten([data.nodes for data in self.merger_data])
429         info = {
430           "clusters": self.clusters,
431           "nodes": nodes,
432           }
433         logging.critical("In order to rollback do the following:")
434         for step in rbsteps:
435           logging.critical("  * %s", step % info)
436       else:
437         logging.critical("Nothing to rollback.")
438
439       # TODO: Keep track of steps done for a flawless resume?
440
441   def Cleanup(self):
442     """Clean up our environment.
443
444     This cleans up remote private keys and configs and after that
445     deletes the temporary directory.
446
447     """
448     shutil.rmtree(self.work_dir)
449
450
451 def SetupLogging(options):
452   """Setting up logging infrastructure.
453
454   @param options: Parsed command line options
455
456   """
457   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
458
459   stderr_handler = logging.StreamHandler()
460   stderr_handler.setFormatter(formatter)
461   if options.debug:
462     stderr_handler.setLevel(logging.NOTSET)
463   elif options.verbose:
464     stderr_handler.setLevel(logging.INFO)
465   else:
466     stderr_handler.setLevel(logging.ERROR)
467
468   root_logger = logging.getLogger("")
469   root_logger.setLevel(logging.NOTSET)
470   root_logger.addHandler(stderr_handler)
471
472
473 def main():
474   """Main routine.
475
476   """
477   program = os.path.basename(sys.argv[0])
478
479   parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
480                                         " [--watcher-pause-period SECONDS]"
481                                         " <cluster> <cluster...>"),
482                                         prog=program)
483   parser.add_option(cli.DEBUG_OPT)
484   parser.add_option(cli.VERBOSE_OPT)
485   parser.add_option(PAUSE_PERIOD_OPT)
486
487   (options, args) = parser.parse_args()
488
489   SetupLogging(options)
490
491   if not args:
492     parser.error("No clusters specified")
493
494   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period)
495   try:
496     try:
497       cluster_merger.Setup()
498       cluster_merger.Merge()
499     except errors.GenericError, e:
500       logging.exception(e)
501       return constants.EXIT_FAILURE
502   finally:
503     cluster_merger.Cleanup()
504
505   return constants.EXIT_SUCCESS
506
507
508 if __name__ == "__main__":
509   sys.exit(main())