Minor reordering to match param order
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
46                                   action="store", type="int",
47                                   dest="pause_period",
48                                   help=("Amount of time in seconds watcher"
49                                         " should be suspended from running"))
50 _CLUSTERMERGE_ECID = "clustermerge-ecid"
51
52
53 def Flatten(unflattened_list):
54   """Flattens a list.
55
56   @param unflattened_list: A list of unflattened list objects.
57   @return: A flattened list
58
59   """
60   flattened_list = []
61
62   for item in unflattened_list:
63     if isinstance(item, list):
64       flattened_list.extend(Flatten(item))
65     else:
66       flattened_list.append(item)
67   return flattened_list
68
69
70 class MergerData(object):
71   """Container class to hold data used for merger.
72
73   """
74   def __init__(self, cluster, key_path, nodes, instances, config_path=None):
75     """Initialize the container.
76
77     @param cluster: The name of the cluster
78     @param key_path: Path to the ssh private key used for authentication
79     @param nodes: List of nodes in the merging cluster
80     @param instances: List of instances running on merging cluster
81     @param config_path: Path to the merging cluster config
82
83     """
84     self.cluster = cluster
85     self.key_path = key_path
86     self.nodes = nodes
87     self.instances = instances
88     self.config_path = config_path
89
90
91 class Merger(object):
92   """Handling the merge.
93
94   """
95   def __init__(self, clusters, pause_period):
96     """Initialize object with sane defaults and infos required.
97
98     @param clusters: The list of clusters to merge in
99     @param pause_period: The time watcher shall be disabled for
100
101     """
102     self.merger_data = []
103     self.clusters = clusters
104     self.pause_period = pause_period
105     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
106     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
107     self.ssh_runner = ssh.SshRunner(self.cluster_name)
108
109   def Setup(self):
110     """Sets up our end so we can do the merger.
111
112     This method is setting us up as a preparation for the merger.
113     It makes the initial contact and gathers information needed.
114
115     @raise errors.RemoteError: for errors in communication/grabbing
116
117     """
118     (remote_path, _, _) = ssh.GetUserFiles("root")
119
120     if self.cluster_name in self.clusters:
121       raise errors.CommandError("Cannot merge cluster %s with itself" %
122                                 self.cluster_name)
123
124     # Fetch remotes private key
125     for cluster in self.clusters:
126       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
127                             ask_key=False)
128       if result.failed:
129         raise errors.RemoteError("There was an error while grabbing ssh private"
130                                  " key from %s. Fail reason: %s; output: %s" %
131                                  (cluster, result.fail_reason, result.output))
132
133       key_path = utils.PathJoin(self.work_dir, cluster)
134       utils.WriteFile(key_path, mode=0600, data=result.stdout)
135
136       result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
137                             private_key=key_path)
138       if result.failed:
139         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
140                                  " Fail reason: %s; output: %s" %
141                                  (cluster, result.fail_reason, result.output))
142       nodes = result.stdout.splitlines()
143
144       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
145                             private_key=key_path)
146       if result.failed:
147         raise errors.RemoteError("Unable to retrieve list of instances from"
148                                  " %s. Fail reason: %s; output: %s" %
149                                  (cluster, result.fail_reason, result.output))
150       instances = result.stdout.splitlines()
151
152       self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
153
154   def _PrepareAuthorizedKeys(self):
155     """Prepare the authorized_keys on every merging node.
156
157     This method add our public key to remotes authorized_key for further
158     communication.
159
160     """
161     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
162     pub_key = utils.ReadFile(pub_key_file)
163
164     for data in self.merger_data:
165       for node in data.nodes:
166         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
167                                      (auth_keys, pub_key)),
168                               private_key=data.key_path)
169
170         if result.failed:
171           raise errors.RemoteError("Unable to add our public key to %s in %s."
172                                    " Fail reason: %s; output: %s" %
173                                    (node, data.cluster, result.fail_reason,
174                                     result.output))
175
176   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
177               strict_host_check=False, private_key=None, batch=True,
178               ask_key=False):
179     """Wrapping SshRunner.Run with default parameters.
180
181     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
182
183     """
184     return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
185                                use_cluster_key=use_cluster_key,
186                                strict_host_check=strict_host_check,
187                                private_key=private_key, batch=batch,
188                                ask_key=ask_key)
189
190   def _StopMergingInstances(self):
191     """Stop instances on merging clusters.
192
193     """
194     for cluster in self.clusters:
195       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
196                                      " --force-multiple")
197
198       if result.failed:
199         raise errors.RemoteError("Unable to stop instances on %s."
200                                  " Fail reason: %s; output: %s" %
201                                  (cluster, result.fail_reason, result.output))
202
203   def _DisableWatcher(self):
204     """Disable watch on all merging clusters, including ourself.
205
206     """
207     for cluster in ["localhost"] + self.clusters:
208       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
209                                      self.pause_period)
210
211       if result.failed:
212         raise errors.RemoteError("Unable to pause watcher on %s."
213                                  " Fail reason: %s; output: %s" %
214                                  (cluster, result.fail_reason, result.output))
215
216   def _StopDaemons(self):
217     """Stop all daemons on merging nodes.
218
219     """
220     cmd = "%s stop-all" % constants.DAEMON_UTIL
221     for data in self.merger_data:
222       for node in data.nodes:
223         result = self._RunCmd(node, cmd)
224
225         if result.failed:
226           raise errors.RemoteError("Unable to stop daemons on %s."
227                                    " Fail reason: %s; output: %s." %
228                                    (node, result.fail_reason, result.output))
229
230   def _FetchRemoteConfig(self):
231     """Fetches and stores remote cluster config from the master.
232
233     This step is needed before we can merge the config.
234
235     """
236     for data in self.merger_data:
237       result = self._RunCmd(data.cluster, "cat %s" %
238                                           constants.CLUSTER_CONF_FILE)
239
240       if result.failed:
241         raise errors.RemoteError("Unable to retrieve remote config on %s."
242                                  " Fail reason: %s; output %s" %
243                                  (data.cluster, result.fail_reason,
244                                   result.output))
245
246       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
247                                         data.cluster)
248       utils.WriteFile(data.config_path, data=result.stdout)
249
250   # R0201: Method could be a function
251   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
252     """Kills the local master daemon.
253
254     @raise errors.CommandError: If unable to kill
255
256     """
257     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
258     if result.failed:
259       raise errors.CommandError("Unable to stop master daemons."
260                                 " Fail reason: %s; output: %s" %
261                                 (result.fail_reason, result.output))
262
263   def _MergeConfig(self):
264     """Merges all foreign config into our own config.
265
266     """
267     my_config = config.ConfigWriter(offline=True)
268     fake_ec_id = 0 # Needs to be uniq over the whole config merge
269
270     for data in self.merger_data:
271       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
272       self._MergeNodeGroups(my_config, other_config)
273
274       for node in other_config.GetNodeList():
275         node_info = other_config.GetNodeInfo(node)
276         node_info.master_candidate = False
277         my_config.AddNode(node_info, str(fake_ec_id))
278         fake_ec_id += 1
279
280       for instance in other_config.GetInstanceList():
281         instance_info = other_config.GetInstanceInfo(instance)
282
283         # Update the DRBD port assignments
284         # This is a little bit hackish
285         for dsk in instance_info.disks:
286           if dsk.dev_type in constants.LDS_DRBD:
287             port = my_config.AllocatePort()
288
289             logical_id = list(dsk.logical_id)
290             logical_id[2] = port
291             dsk.logical_id = tuple(logical_id)
292
293             physical_id = list(dsk.physical_id)
294             physical_id[1] = physical_id[3] = port
295             dsk.physical_id = tuple(physical_id)
296
297         my_config.AddInstance(instance_info, str(fake_ec_id))
298         fake_ec_id += 1
299
300   # R0201: Method could be a function
301   def _MergeNodeGroups(self, my_config, other_config):
302     """Adds foreign node groups
303
304     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
305     """
306     # pylint: disable-msg=R0201
307     for grp in other_config.GetAllNodeGroupsInfo().values():
308       #TODO: handle node group conflicts
309       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
310
311   # R0201: Method could be a function
312   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
313     """Starts the local master daemon.
314
315     @param no_vote: Should the masterd started without voting? default: False
316     @raise errors.CommandError: If unable to start daemon.
317
318     """
319     env = {}
320     if no_vote:
321       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
322
323     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
324     if result.failed:
325       raise errors.CommandError("Couldn't start ganeti master."
326                                 " Fail reason: %s; output: %s" %
327                                 (result.fail_reason, result.output))
328
329   def _ReaddMergedNodesAndRedist(self):
330     """Readds all merging nodes and make sure their config is up-to-date.
331
332     @raise errors.CommandError: If anything fails.
333
334     """
335     for data in self.merger_data:
336       for node in data.nodes:
337         result = utils.RunCmd(["gnt-node", "add", "--readd",
338                                "--no-ssh-key-check", "--force-join", node])
339         if result.failed:
340           raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
341                                     " output: %s" % (node, result.fail_reason,
342                                                      result.output))
343
344     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
345     if result.failed:
346       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
347                                 " output: %s" % (result.fail_reason,
348                                                 result.output))
349
350   # R0201: Method could be a function
351   def _StartupAllInstances(self): # pylint: disable-msg=R0201
352     """Starts up all instances (locally).
353
354     @raise errors.CommandError: If unable to start clusters
355
356     """
357     result = utils.RunCmd(["gnt-instance", "startup", "--all",
358                            "--force-multiple"])
359     if result.failed:
360       raise errors.CommandError("Unable to start all instances."
361                                 " Fail reason: %s; output: %s" %
362                                 (result.fail_reason, result.output))
363
364   # R0201: Method could be a function
365   def _VerifyCluster(self): # pylint: disable-msg=R0201
366     """Runs gnt-cluster verify to verify the health.
367
368     @raise errors.ProgrammError: If cluster fails on verification
369
370     """
371     result = utils.RunCmd(["gnt-cluster", "verify"])
372     if result.failed:
373       raise errors.CommandError("Verification of cluster failed."
374                                 " Fail reason: %s; output: %s" %
375                                 (result.fail_reason, result.output))
376
377   def Merge(self):
378     """Does the actual merge.
379
380     It runs all the steps in the right order and updates the user about steps
381     taken. Also it keeps track of rollback_steps to undo everything.
382
383     """
384     rbsteps = []
385     try:
386       logging.info("Pre cluster verification")
387       self._VerifyCluster()
388
389       logging.info("Prepare authorized_keys")
390       rbsteps.append("Remove our key from authorized_keys on nodes:"
391                      " %(nodes)s")
392       self._PrepareAuthorizedKeys()
393
394       rbsteps.append("Start all instances again on the merging"
395                      " clusters: %(clusters)s")
396       logging.info("Stopping merging instances (takes a while)")
397       self._StopMergingInstances()
398
399       logging.info("Disable watcher")
400       self._DisableWatcher()
401       logging.info("Stop daemons on merging nodes")
402       self._StopDaemons()
403       logging.info("Merging config")
404       self._FetchRemoteConfig()
405
406       def _OfflineClusterMerge(_):
407         """Closure run when master daemons stopped
408
409         """
410         rbsteps.append("Restore %s from another master candidate" %
411                        constants.CLUSTER_CONF_FILE)
412         self._MergeConfig()
413         self._StartMasterDaemon(no_vote=True)
414
415         # Point of no return, delete rbsteps
416         del rbsteps[:]
417
418         logging.warning("We are at the point of no return. Merge can not easily"
419                         " be undone after this point.")
420         logging.info("Readd nodes and redistribute config")
421         self._ReaddMergedNodesAndRedist()
422         self._KillMasterDaemon()
423
424       cli.RunWhileClusterStopped(logging.info, _OfflineClusterMerge)
425
426       logging.info("Starting instances again")
427       self._StartupAllInstances()
428       logging.info("Post cluster verification")
429       self._VerifyCluster()
430     except errors.GenericError, e:
431       logging.exception(e)
432
433       if rbsteps:
434         nodes = Flatten([data.nodes for data in self.merger_data])
435         info = {
436           "clusters": self.clusters,
437           "nodes": nodes,
438           }
439         logging.critical("In order to rollback do the following:")
440         for step in rbsteps:
441           logging.critical("  * %s", step % info)
442       else:
443         logging.critical("Nothing to rollback.")
444
445       # TODO: Keep track of steps done for a flawless resume?
446
447   def Cleanup(self):
448     """Clean up our environment.
449
450     This cleans up remote private keys and configs and after that
451     deletes the temporary directory.
452
453     """
454     shutil.rmtree(self.work_dir)
455
456
457 def SetupLogging(options):
458   """Setting up logging infrastructure.
459
460   @param options: Parsed command line options
461
462   """
463   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
464
465   stderr_handler = logging.StreamHandler()
466   stderr_handler.setFormatter(formatter)
467   if options.debug:
468     stderr_handler.setLevel(logging.NOTSET)
469   elif options.verbose:
470     stderr_handler.setLevel(logging.INFO)
471   else:
472     stderr_handler.setLevel(logging.ERROR)
473
474   root_logger = logging.getLogger("")
475   root_logger.setLevel(logging.NOTSET)
476   root_logger.addHandler(stderr_handler)
477
478
479 def main():
480   """Main routine.
481
482   """
483   program = os.path.basename(sys.argv[0])
484
485   parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
486                                         " [--watcher-pause-period SECONDS]"
487                                         " <cluster> <cluster...>"),
488                                         prog=program)
489   parser.add_option(cli.DEBUG_OPT)
490   parser.add_option(cli.VERBOSE_OPT)
491   parser.add_option(PAUSE_PERIOD_OPT)
492
493   (options, args) = parser.parse_args()
494
495   parser.error("Unfortunately this tool is currently broken and cannot"
496                " be used. Sorry!")
497
498   SetupLogging(options)
499
500   if not args:
501     parser.error("No clusters specified")
502
503   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period)
504   try:
505     try:
506       cluster_merger.Setup()
507       cluster_merger.Merge()
508     except errors.GenericError, e:
509       logging.exception(e)
510       return constants.EXIT_FAILURE
511   finally:
512     cluster_merger.Cleanup()
513
514   return constants.EXIT_SUCCESS
515
516
517 if __name__ == "__main__":
518   sys.exit(main())