Fix grammar of var naming
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
46                                   action="store", type="int",
47                                   dest="pause_period",
48                                   help=("Amount of time in seconds watcher"
49                                         " should be suspended from running"))
50
51
52 def Flatten(unflattened_list):
53   """Flattens a list.
54
55   @param unflattened_list: A list of unflattened list objects.
56   @return: A flattened list
57
58   """
59   flattened_list = []
60
61   for item in unflattened_list:
62     if isinstance(item, list):
63       flattened_list.extend(Flatten(item))
64     else:
65       flattened_list.append(item)
66   return flattened_list
67
68
69 class MergerData(object):
70   """Container class to hold data used for merger.
71
72   """
73   def __init__(self, cluster, key_path, nodes, instances, config_path=None):
74     """Initialize the container.
75
76     @param cluster: The name of the cluster
77     @param key_path: Path to the ssh private key used for authentication
78     @param config_path: Path to the merging cluster config
79     @param nodes: List of nodes in the merging cluster
80     @param instances: List of instances running on merging cluster
81
82     """
83     self.cluster = cluster
84     self.key_path = key_path
85     self.config_path = config_path
86     self.instances = instances
87     self.nodes = nodes
88
89
90 class Merger(object):
91   """Handling the merge.
92
93   """
94   def __init__(self, clusters, pause_period):
95     """Initialize object with sane defaults and infos required.
96
97     @param clusters: The list of clusters to merge in
98     @param pause_period: The time watcher shall be disabled for
99
100     """
101     self.merger_data = []
102     self.clusters = clusters
103     self.pause_period = pause_period
104     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
105     self.cluster_name = cli.GetClient().QueryConfigValues(["cluster_name"])
106     self.ssh_runner = ssh.SshRunner(self.cluster_name)
107
108   def Setup(self):
109     """Sets up our end so we can do the merger.
110
111     This method is setting us up as a preparation for the merger.
112     It makes the initial contact and gathers information needed.
113
114     @raise errors.RemoteError: for errors in communication/grabbing
115
116     """
117     (remote_path, _, _) = ssh.GetUserFiles("root")
118
119     # Fetch remotes private key
120     for cluster in self.clusters:
121       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
122                             ask_key=False)
123       if result.failed:
124         raise errors.RemoteError("There was an error while grabbing ssh private"
125                                  " key from %s. Fail reason: %s; output: %s" %
126                                  (cluster, result.fail_reason, result.output))
127
128       key_path = utils.PathJoin(self.work_dir, cluster)
129       utils.WriteFile(key_path, mode=0600, data=result.stdout)
130
131       result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
132                             private_key=key_path)
133       if result.failed:
134         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
135                                  " Fail reason: %s; output: %s" %
136                                  (cluster, result.fail_reason, result.output))
137       nodes = result.stdout.splitlines()
138
139       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
140                             private_key=key_path)
141       if result.failed:
142         raise errors.RemoteError("Unable to retrieve list of instances from"
143                                  " %s. Fail reason: %s; output: %s" %
144                                  (cluster, result.fail_reason, result.output))
145       instances = result.stdout.splitlines()
146
147       self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
148
149   def _PrepareAuthorizedKeys(self):
150     """Prepare the authorized_keys on every merging node.
151
152     This method add our public key to remotes authorized_key for further
153     communication.
154
155     """
156     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
157     pub_key = utils.ReadFile(pub_key_file)
158
159     for data in self.merger_data:
160       for node in data.nodes:
161         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
162                                      (auth_keys, pub_key)),
163                               private_key=data.key_path)
164
165         if result.failed:
166           raise errors.RemoteError("Unable to add our public key to %s in %s."
167                                    " Fail reason: %s; output: %s" %
168                                    (node, data.cluster, result.fail_reason,
169                                     result.output))
170
171   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
172               strict_host_check=False, private_key=None, batch=True,
173               ask_key=False):
174     """Wrapping SshRunner.Run with default parameters.
175
176     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
177
178     """
179     return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
180                                use_cluster_key=use_cluster_key,
181                                strict_host_check=strict_host_check,
182                                private_key=private_key, batch=batch,
183                                ask_key=ask_key)
184
185   def _StopMergingInstances(self):
186     """Stop instances on merging clusters.
187
188     """
189     for cluster in self.clusters:
190       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
191                                      " --force-multiple")
192
193       if result.failed:
194         raise errors.RemoteError("Unable to stop instances on %s."
195                                  " Fail reason: %s; output: %s" %
196                                  (cluster, result.fail_reason, result.output))
197
198   def _DisableWatcher(self):
199     """Disable watch on all merging clusters, including ourself.
200
201     """
202     for cluster in ["localhost"] + self.clusters:
203       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
204                                      self.pause_period)
205
206       if result.failed:
207         raise errors.RemoteError("Unable to pause watcher on %s."
208                                  " Fail reason: %s; output: %s" %
209                                  (cluster, result.fail_reason, result.output))
210
211   def _StopDaemons(self):
212     """Stop all daemons on merging nodes.
213
214     """
215     cmd = "%s stop-all" % constants.DAEMON_UTIL
216     for data in self.merger_data:
217       for node in data.nodes:
218         result = self._RunCmd(node, cmd)
219
220         if result.failed:
221           raise errors.RemoteError("Unable to stop daemons on %s."
222                                    " Fail reason: %s; output: %s." %
223                                    (node, result.fail_reason, result.output))
224
225   def _FetchRemoteConfig(self):
226     """Fetches and stores remote cluster config from the master.
227
228     This step is needed before we can merge the config.
229
230     """
231     for data in self.merger_data:
232       result = self._RunCmd(data.cluster, "cat %s" %
233                                           constants.CLUSTER_CONF_FILE)
234
235       if result.failed:
236         raise errors.RemoteError("Unable to retrieve remote config on %s."
237                                  " Fail reason: %s; output %s" %
238                                  (data.cluster, result.fail_reason,
239                                   result.output))
240
241       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
242                                         data.cluster)
243       utils.WriteFile(data.config_path, data=result.stdout)
244
245   # R0201: Method could be a function
246   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
247     """Kills the local master daemon.
248
249     @raise errors.CommandError: If unable to kill
250
251     """
252     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
253     if result.failed:
254       raise errors.CommandError("Unable to stop master daemons."
255                                 " Fail reason: %s; output: %s" %
256                                 (result.fail_reason, result.output))
257
258   def _MergeConfig(self):
259     """Merges all foreign config into our own config.
260
261     """
262     my_config = config.ConfigWriter(offline=True)
263     fake_ec_id = 0 # Needs to be uniq over the whole config merge
264
265     for data in self.merger_data:
266       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
267
268       for node in other_config.GetNodeList():
269         node_info = other_config.GetNodeInfo(node)
270         node_info.master_candidate = False
271         my_config.AddNode(node_info, str(fake_ec_id))
272         fake_ec_id += 1
273
274       for instance in other_config.GetInstanceList():
275         instance_info = other_config.GetInstanceInfo(instance)
276
277         # Update the DRBD port assignments
278         # This is a little bit hackish
279         for dsk in instance_info.disks:
280           if dsk.dev_type in constants.LDS_DRBD:
281             port = my_config.AllocatePort()
282
283             logical_id = list(dsk.logical_id)
284             logical_id[2] = port
285             dsk.logical_id = tuple(logical_id)
286
287             physical_id = list(dsk.physical_id)
288             physical_id[1] = physical_id[3] = port
289             dsk.physical_id = tuple(physical_id)
290
291         my_config.AddInstance(instance_info, str(fake_ec_id))
292         fake_ec_id += 1
293
294   # R0201: Method could be a function
295   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
296     """Starts the local master daemon.
297
298     @param no_vote: Should the masterd started without voting? default: False
299     @raise errors.CommandError: If unable to start daemon.
300
301     """
302     env = {}
303     if no_vote:
304       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
305
306     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
307     if result.failed:
308       raise errors.CommandError("Couldn't start ganeti master."
309                                 " Fail reason: %s; output: %s" %
310                                 (result.fail_reason, result.output))
311
312   def _ReaddMergedNodesAndRedist(self):
313     """Readds all merging nodes and make sure their config is up-to-date.
314
315     @raise errors.CommandError: If anything fails.
316
317     """
318     for data in self.merger_data:
319       for node in data.nodes:
320         result = utils.RunCmd(["gnt-node", "add", "--readd",
321                                "--no-ssh-key-check", node])
322         if result.failed:
323           raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
324                                     " output: %s" % (node, result.fail_reason,
325                                                      result.output))
326
327     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
328     if result.failed:
329       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
330                                 " output: %s" % (result.fail_reason,
331                                                 result.output))
332
333   # R0201: Method could be a function
334   def _StartupAllInstances(self): # pylint: disable-msg=R0201
335     """Starts up all instances (locally).
336
337     @raise errors.CommandError: If unable to start clusters
338
339     """
340     result = utils.RunCmd(["gnt-instance", "startup", "--all",
341                            "--force-multiple"])
342     if result.failed:
343       raise errors.CommandError("Unable to start all instances."
344                                 " Fail reason: %s; output: %s" %
345                                 (result.fail_reason, result.output))
346
347   # R0201: Method could be a function
348   def _VerifyCluster(self): # pylint: disable-msg=R0201
349     """Runs gnt-cluster verify to verify the health.
350
351     @raise errors.ProgrammError: If cluster fails on verification
352
353     """
354     result = utils.RunCmd(["gnt-cluster", "verify"])
355     if result.failed:
356       raise errors.CommandError("Verification of cluster failed."
357                                 " Fail reason: %s; output: %s" %
358                                 (result.fail_reason, result.output))
359
360   def Merge(self):
361     """Does the actual merge.
362
363     It runs all the steps in the right order and updates the user about steps
364     taken. Also it keeps track of rollback_steps to undo everything.
365
366     """
367     rbsteps = []
368     try:
369       logging.info("Pre cluster verification")
370       self._VerifyCluster()
371
372       logging.info("Prepare authorized_keys")
373       rbsteps.append("Remove our key from authorized_keys on nodes:"
374                      " %(nodes)s")
375       self._PrepareAuthorizedKeys()
376
377       rbsteps.append("Start all instances again on the merging"
378                      " clusters: %(clusters)s")
379       logging.info("Stopping merging instances (takes a while)")
380       self._StopMergingInstances()
381
382       logging.info("Disable watcher")
383       self._DisableWatcher()
384       logging.info("Stop daemons on merging nodes")
385       self._StopDaemons()
386       logging.info("Merging config")
387       self._FetchRemoteConfig()
388
389       def _OfflineClusterMerge(_):
390         """Closure run when master daemons stopped
391
392         """
393         rbsteps.append("Restore %s from another master candidate" %
394                        constants.CLUSTER_CONF_FILE)
395         self._MergeConfig()
396         self._StartMasterDaemon(no_vote=True)
397
398         # Point of no return, delete rbsteps
399         del rbsteps[:]
400
401         logging.warning("We are at the point of no return. Merge can not easily"
402                         " be undone after this point.")
403         logging.info("Readd nodes and redistribute config")
404         self._ReaddMergedNodesAndRedist()
405         self._KillMasterDaemon()
406
407       cli.RunWhileClusterStopped(logging.info, _OfflineClusterMerge)
408
409       logging.info("Starting instances again")
410       self._StartupAllInstances()
411       logging.info("Post cluster verification")
412       self._VerifyCluster()
413     except errors.GenericError, e:
414       logging.exception(e)
415
416       if rbsteps:
417         nodes = Flatten([data.nodes for data in self.merger_data])
418         info = {
419           "clusters": self.clusters,
420           "nodes": nodes,
421           }
422         logging.critical("In order to rollback do the following:")
423         for step in rbsteps:
424           logging.critical("  * %s", step % info)
425       else:
426         logging.critical("Nothing to rollback.")
427
428       # TODO: Keep track of steps done for a flawless resume?
429
430   def Cleanup(self):
431     """Clean up our environment.
432
433     This cleans up remote private keys and configs and after that
434     deletes the temporary directory.
435
436     """
437     shutil.rmtree(self.work_dir)
438
439
440 def SetupLogging(options):
441   """Setting up logging infrastructure.
442
443   @param options: Parsed command line options
444
445   """
446   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
447
448   stderr_handler = logging.StreamHandler()
449   stderr_handler.setFormatter(formatter)
450   if options.debug:
451     stderr_handler.setLevel(logging.NOTSET)
452   elif options.verbose:
453     stderr_handler.setLevel(logging.INFO)
454   else:
455     stderr_handler.setLevel(logging.ERROR)
456
457   root_logger = logging.getLogger("")
458   root_logger.setLevel(logging.NOTSET)
459   root_logger.addHandler(stderr_handler)
460
461
462 def main():
463   """Main routine.
464
465   """
466   program = os.path.basename(sys.argv[0])
467
468   parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
469                                         " [--watcher-pause-period SECONDS]"
470                                         " <cluster> <cluster...>"),
471                                         prog=program)
472   parser.add_option(cli.DEBUG_OPT)
473   parser.add_option(cli.VERBOSE_OPT)
474   parser.add_option(PAUSE_PERIOD_OPT)
475
476   (options, args) = parser.parse_args()
477
478   SetupLogging(options)
479
480   if not args:
481     parser.error("No clusters specified")
482
483   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period)
484   try:
485     try:
486       cluster_merger.Setup()
487       cluster_merger.Merge()
488     except errors.GenericError, e:
489       logging.exception(e)
490       return constants.EXIT_FAILURE
491   finally:
492     cluster_merger.Cleanup()
493
494   return constants.EXIT_SUCCESS
495
496
497 if __name__ == "__main__":
498   sys.exit(main())