Revision 1f7d3f7d

b/Makefile.am
233 233
	tools/burnin \
234 234
	tools/cfgshell \
235 235
	tools/cfgupgrade \
236
	tools/cluster-merge \
236 237
	tools/lvmstrap
237 238

  
238 239
pkglib_SCRIPTS = \
b/tools/cluster-merge
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21
"""Tool to merge two or more clusters together.
22

  
23
The clusters have to run the same version of Ganeti!
24

  
25
"""
26

  
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

  
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

  
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

  
44

  
45
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
46
                                  action="store", type="int",
47
                                  dest="pause_period",
48
                                  help=("Amount of time in seconds watcher"
49
                                        " should be suspended from running"))
50

  
51

  
52
def Flatten(unflatten_list):
53
  """Flattens a list.
54

  
55
  @param unflatten_list: A list of unflatten list objects.
56
  @return: A flatten list
57

  
58
  """
59
  flatten_list = []
60

  
61
  for item in unflatten_list:
62
    if isinstance(item, list):
63
      flatten_list.extend(Flatten(item))
64
    else:
65
      flatten_list.append(item)
66
  return flatten_list
67

  
68

  
69
class MergerData(object):
70
  """Container class to hold data used for merger.
71

  
72
  """
73
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
74
    """Initialize the container.
75

  
76
    @param cluster: The name of the cluster
77
    @param key_path: Path to the ssh private key used for authentication
78
    @param config_path: Path to the merging cluster config
79
    @param nodes: List of nodes in the merging cluster
80
    @param instances: List of instances running on merging cluster
81

  
82
    """
83
    self.cluster = cluster
84
    self.key_path = key_path
85
    self.config_path = config_path
86
    self.instances = instances
87
    self.nodes = nodes
88

  
89

  
90
class Merger(object):
91
  """Handling the merge.
92

  
93
  """
94
  def __init__(self, clusters, pause_period):
95
    """Initialize object with sane defaults and infos required.
96

  
97
    @param clusters: The list of clusters to merge in
98
    @param pause_period: The time watcher shall be disabled for
99

  
100
    """
101
    self.merger_data = []
102
    self.clusters = clusters
103
    self.pause_period = pause_period
104
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
105
    self.cluster_name = cli.GetClient().QueryConfigValues(["cluster_name"])
106
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
107

  
108
  def Setup(self):
109
    """Sets up our end so we can do the merger.
110

  
111
    This method is setting us up as a preparation for the merger.
112
    It makes the initial contact and gathers information needed.
113

  
114
    @raise errors.RemoteError: for errors in communication/grabbing
115

  
116
    """
117
    (remote_path, _, _) = ssh.GetUserFiles("root")
118

  
119
    # Fetch remotes private key
120
    for cluster in self.clusters:
121
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
122
                            ask_key=False)
123
      if result.failed:
124
        raise errors.RemoteError("There was an error while grabbing ssh private"
125
                                 " key from %s. Fail reason: %s; output: %s" %
126
                                 (cluster, result.fail_reason, result.output))
127

  
128
      key_path = os.path.join(self.work_dir, cluster)
129
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
130

  
131
      result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
132
                            private_key=key_path)
133
      if result.failed:
134
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
135
                                 " Fail reason: %s; output: %s" %
136
                                 (cluster, result.fail_reason, result.output))
137
      nodes = result.stdout.splitlines()
138

  
139
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
140
                            private_key=key_path)
141
      if result.failed:
142
        raise errors.RemoteError("Unable to retrieve list of instances from"
143
                                 " %s. Fail reason: %s; output: %s" %
144
                                 (cluster, result.fail_reason, result.output))
145
      instances = result.stdout.splitlines()
146

  
147
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
148

  
149
  def _PrepareAuthorizedKeys(self):
150
    """Prepare the authorized_keys on every merging node.
151

  
152
    This method add our public key to remotes authorized_key for further
153
    communication.
154

  
155
    """
156
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
157
    pub_key = utils.ReadFile(pub_key_file)
158

  
159
    for data in self.merger_data:
160
      for node in data.nodes:
161
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
162
                                     (auth_keys, pub_key)),
163
                              private_key=data.key_path)
164

  
165
        if result.failed:
166
          raise errors.RemoteError("Unable to add our public key to %s in %s."
167
                                   " Fail reason: %s; output: %s" %
168
                                   (node, data.cluster, result.fail_reason,
169
                                    result.output))
170

  
171
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
172
              strict_host_check=False, private_key=None, batch=True,
173
              ask_key=False):
174
    """Wrapping SshRunner.Run with default parameters.
175

  
176
    For explanation of parameters see L{ssh.SshRunner.Run}.
177

  
178
    """
179
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
180
                               use_cluster_key=use_cluster_key,
181
                               strict_host_check=strict_host_check,
182
                               private_key=private_key, batch=batch,
183
                               ask_key=ask_key)
184

  
185
  def _StopMergingInstances(self):
186
    """Stop instances on merging clusters.
187

  
188
    """
189
    for cluster in self.clusters:
190
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
191
                                     " --force-multiple")
192

  
193
      if result.failed:
194
        raise errors.RemoteError("Unable to stop instances on %s."
195
                                 " Fail reason: %s; output: %s" %
196
                                 (cluster, result.fail_reason, result.output))
197

  
198
  def _DisableWatcher(self):
199
    """Disable watch on all merging clusters, including ourself.
200

  
201
    """
202
    for cluster in ["localhost"] + self.clusters:
203
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
204
                                     self.pause_period)
205

  
206
      if result.failed:
207
        raise errors.RemoteError("Unable to pause watcher on %s."
208
                                 " Fail reason: %s; output: %s" %
209
                                 (cluster, result.fail_reason, result.output))
210

  
211

  
212
  # R0201: Method could be a function
213
  def _EnableWatcher(self): # pylint: disable-msg=R0201
214
    """Reenable watcher (locally).
215

  
216
    """
217
    result = utils.RunCmd(["gnt-cluster", "watcher", "continue"])
218

  
219
    if result.failed:
220
      logging.warning("Unable to continue watcher. Fail reason: %s;"
221
                      " output: %s" % (result.fail_reason,
222
                                       result.output))
223

  
224
  def _StopDaemons(self):
225
    """Stop all daemons on merging nodes.
226

  
227
    """
228
    # FIXME: Worth to put this into constants?
229
    cmds = []
230
    for daemon in (constants.RAPI, constants.MASTERD,
231
                   constants.NODED, constants.CONFD):
232
      cmds.append("%s stop %s" % (constants.DAEMON_UTIL, daemon))
233
    for data in self.merger_data:
234
      for node in data.nodes:
235
        result = self._RunCmd(node, " && ".join(cmds))
236

  
237
        if result.failed:
238
          raise errors.RemoteError("Unable to stop daemons on %s."
239
                                   " Fail reason: %s; output: %s." %
240
                                   (node, result.fail_reason, result.output))
241

  
242
  def _FetchRemoteConfig(self):
243
    """Fetches and stores remote cluster config from the master.
244

  
245
    This step is needed before we can merge the config.
246

  
247
    """
248
    for data in self.merger_data:
249
      result = self._RunCmd(data.cluster, "cat %s" %
250
                                          constants.CLUSTER_CONF_FILE)
251

  
252
      if result.failed:
253
        raise errors.RemoteError("Unable to retrieve remote config on %s."
254
                                 " Fail reason: %s; output %s" %
255
                                 (data.cluster, result.fail_reason,
256
                                  result.output))
257

  
258
      data.config_path = os.path.join(self.work_dir, "%s_config.data" %
259
                                                     data.cluster)
260
      utils.WriteFile(data.config_path, data=result.stdout)
261

  
262
  # R0201: Method could be a function
263
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
264
    """Kills the local master daemon.
265

  
266
    @raise errors.CommandError: If unable to kill
267

  
268
    """
269
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
270
    if result.failed:
271
      raise errors.CommandError("Unable to stop master daemons."
272
                                " Fail reason: %s; output: %s" %
273
                                (result.fail_reason, result.output))
274

  
275
  def _MergeConfig(self):
276
    """Merges all foreign config into our own config.
277

  
278
    """
279
    my_config = config.ConfigWriter(offline=True)
280
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
281

  
282
    for data in self.merger_data:
283
      other_config = config.ConfigWriter(data.config_path)
284

  
285
      for node in other_config.GetNodeList():
286
        node_info = other_config.GetNodeInfo(node)
287
        node_info.master_candidate = False
288
        my_config.AddNode(node_info, str(fake_ec_id))
289
        fake_ec_id += 1
290

  
291
      for instance in other_config.GetInstanceList():
292
        instance_info = other_config.GetInstanceInfo(instance)
293

  
294
        # Update the DRBD port assignments
295
        # This is a little bit hackish
296
        for dsk in instance_info.disks:
297
          if dsk.dev_type in constants.LDS_DRBD:
298
            port = my_config.AllocatePort()
299

  
300
            logical_id = list(dsk.logical_id)
301
            logical_id[2] = port
302
            dsk.logical_id = tuple(logical_id)
303

  
304
            physical_id = list(dsk.physical_id)
305
            physical_id[1] = physical_id[3] = port
306
            dsk.physical_id = tuple(physical_id)
307

  
308
        my_config.AddInstance(instance_info, str(fake_ec_id))
309
        fake_ec_id += 1
310

  
311
  # R0201: Method could be a function
312
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
313
    """Starts the local master daemon.
314

  
315
    @param no_vote: Should the masterd started without voting? default: False
316
    @raise errors.CommandError: If unable to start daemon.
317

  
318
    """
319
    env = {}
320
    if no_vote:
321
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
322

  
323
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
324
    if result.failed:
325
      raise errors.CommandError("Couldn't start ganeti master."
326
                                " Fail reason: %s; output: %s" %
327
                                (result.fail_reason, result.output))
328

  
329
  def _ReaddMergedNodesAndRedist(self):
330
    """Readds all merging nodes and make sure their config is up-to-date.
331

  
332
    @raise errors.CommandError: If anything fails.
333

  
334
    """
335
    for data in self.merger_data:
336
      for node in data.nodes:
337
        result = utils.RunCmd(["gnt-node", "add", "--readd",
338
                               "--no-ssh-key-check", node])
339
        if result.failed:
340
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
341
                                    " output: %s" % (node, result.fail_reason,
342
                                                     result.output))
343

  
344
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
345
    if result.failed:
346
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
347
                                " output: %s" % (result.fail_reason,
348
                                                result.output))
349

  
350
  # R0201: Method could be a function
351
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
352
    """Starts up all instances (locally).
353

  
354
    @raise errors.CommandError: If unable to start clusters
355

  
356
    """
357
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
358
                           "--force-multiple"])
359
    if result.failed:
360
      raise errors.CommandError("Unable to start all instances."
361
                                " Fail reason: %s; output: %s" %
362
                                (result.fail_reason, result.output))
363

  
364
  # R0201: Method could be a function
365
  def _VerifyCluster(self): # pylint: disable-msg=R0201
366
    """Runs gnt-cluster verify to verify the health.
367

  
368
    @raise errors.ProgrammError: If cluster fails on verification
369

  
370
    """
371
    result = utils.RunCmd(["gnt-cluster", "verify"])
372
    if result.failed:
373
      raise errors.CommandError("Verification of cluster failed."
374
                                " Fail reason: %s; output: %s" %
375
                                (result.fail_reason, result.output))
376

  
377
  def Merge(self):
378
    """Does the actual merge.
379

  
380
    It runs all the steps in the right order and updates the user about steps
381
    taken. Also it keeps track of rollback_steps to undo everything.
382

  
383
    """
384
    rbsteps = []
385
    try:
386
      logging.info("Pre cluster verification")
387
      self._VerifyCluster()
388

  
389
      logging.info("Prepare authorized_keys")
390
      rbsteps.append("Remove our key from authorized_keys on nodes:"
391
                     " %(nodes)s")
392
      self._PrepareAuthorizedKeys()
393

  
394
      rbsteps.append("Start all instances again on the merging"
395
                     " clusters: %(clusters)s")
396
      logging.info("Stopping merging instances (takes a while)")
397
      self._StopMergingInstances()
398

  
399
      logging.info("Disable watcher")
400
      self._DisableWatcher()
401
      logging.info("Stop daemons on merging nodes")
402
      self._StopDaemons()
403
      logging.info("Merging config")
404
      self._FetchRemoteConfig()
405
      self._KillMasterDaemon()
406

  
407
      rbsteps.append("Restore %s from another master candidate" %
408
                     constants.CLUSTER_CONF_FILE)
409
      self._MergeConfig()
410
      self._StartMasterDaemon(no_vote=True)
411

  
412
      # Point of no return, delete rbsteps
413
      del rbsteps[:]
414

  
415
      logging.warning("We are at the point of no return. Merge can not easily"
416
                      " be undone after this point.")
417
      logging.info("Readd nodes and redistribute config")
418
      self._ReaddMergedNodesAndRedist()
419
      self._KillMasterDaemon()
420
      self._StartMasterDaemon()
421
      logging.info("Starting instances again")
422
      self._StartupAllInstances()
423
      logging.info("Post cluster verification")
424
      self._VerifyCluster()
425
    except errors.GenericError, e:
426
      logging.exception(e)
427

  
428
      if rbsteps:
429
        nodes = Flatten([data.nodes for data in self.merger_data])
430
        info = {
431
          "clusters": self.clusters,
432
          "nodes": nodes,
433
          }
434
        logging.critical("In order to rollback do the following:")
435
        for step in rbsteps:
436
          logging.critical("  * %s" % (step % info))
437
      else:
438
        logging.critical("Nothing to rollback.")
439

  
440
      # TODO: Keep track of steps done for a flawless resume?
441

  
442
  def Cleanup(self):
443
    """Clean up our environment.
444

  
445
    This cleans up remote private keys and configs and after that
446
    deletes the temporary directory.
447

  
448
    """
449
    shutil.rmtree(self.work_dir)
450

  
451

  
452
def SetupLogging(options):
453
  """Setting up logging infrastructure.
454

  
455
  @param options: Parsed command line options
456

  
457
  """
458
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
459

  
460
  stderr_handler = logging.StreamHandler()
461
  stderr_handler.setFormatter(formatter)
462
  if options.debug:
463
    stderr_handler.setLevel(logging.NOTSET)
464
  elif options.verbose:
465
    stderr_handler.setLevel(logging.INFO)
466
  else:
467
    stderr_handler.setLevel(logging.ERROR)
468

  
469
  root_logger = logging.getLogger("")
470
  root_logger.setLevel(logging.NOTSET)
471
  root_logger.addHandler(stderr_handler)
472

  
473

  
474
def main():
475
  """Main routine.
476

  
477
  """
478
  program = os.path.basename(sys.argv[0])
479

  
480
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
481
                                        " [--watcher-pause-period SECONDS]"
482
                                        " <cluster> <cluster...>"),
483
                                        prog=program)
484
  parser.add_option(cli.DEBUG_OPT)
485
  parser.add_option(cli.VERBOSE_OPT)
486
  parser.add_option(PAUSE_PERIOD_OPT)
487

  
488
  (options, args) = parser.parse_args()
489

  
490
  SetupLogging(options)
491

  
492
  if not args:
493
    parser.error("No clusters specified")
494

  
495
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period)
496
  try:
497
    try:
498
      cluster_merger.Setup()
499
      cluster_merger.Merge()
500
    except errors.GenericError, e:
501
      logging.exception(e)
502
      return constants.EXIT_FAILURE
503
  finally:
504
    cluster_merger.Cleanup()
505

  
506
  return constants.EXIT_SUCCESS
507

  
508

  
509
if __name__ == "__main__":
510
  sys.exit(main())

Also available in: Unified diff