Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ 454723b5

History | View | Annotate | Download (17.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
46
                                  action="store", type="int",
47
                                  dest="pause_period",
48
                                  help=("Amount of time in seconds watcher"
49
                                        " should be suspended from running"))
50

    
51

    
52
def Flatten(unflatten_list):
53
  """Flattens a list.
54

    
55
  @param unflatten_list: A list of unflatten list objects.
56
  @return: A flatten list
57

    
58
  """
59
  flatten_list = []
60

    
61
  for item in unflatten_list:
62
    if isinstance(item, list):
63
      flatten_list.extend(Flatten(item))
64
    else:
65
      flatten_list.append(item)
66
  return flatten_list
67

    
68

    
69
class MergerData(object):
70
  """Container class to hold data used for merger.
71

    
72
  """
73
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
74
    """Initialize the container.
75

    
76
    @param cluster: The name of the cluster
77
    @param key_path: Path to the ssh private key used for authentication
78
    @param config_path: Path to the merging cluster config
79
    @param nodes: List of nodes in the merging cluster
80
    @param instances: List of instances running on merging cluster
81

    
82
    """
83
    self.cluster = cluster
84
    self.key_path = key_path
85
    self.config_path = config_path
86
    self.instances = instances
87
    self.nodes = nodes
88

    
89

    
90
class Merger(object):
91
  """Handling the merge.
92

    
93
  """
94
  def __init__(self, clusters, pause_period):
95
    """Initialize object with sane defaults and infos required.
96

    
97
    @param clusters: The list of clusters to merge in
98
    @param pause_period: The time watcher shall be disabled for
99

    
100
    """
101
    self.merger_data = []
102
    self.clusters = clusters
103
    self.pause_period = pause_period
104
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
105
    self.cluster_name = cli.GetClient().QueryConfigValues(["cluster_name"])
106
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
107

    
108
  def Setup(self):
109
    """Sets up our end so we can do the merger.
110

    
111
    This method is setting us up as a preparation for the merger.
112
    It makes the initial contact and gathers information needed.
113

    
114
    @raise errors.RemoteError: for errors in communication/grabbing
115

    
116
    """
117
    (remote_path, _, _) = ssh.GetUserFiles("root")
118

    
119
    # Fetch remotes private key
120
    for cluster in self.clusters:
121
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
122
                            ask_key=False)
123
      if result.failed:
124
        raise errors.RemoteError("There was an error while grabbing ssh private"
125
                                 " key from %s. Fail reason: %s; output: %s" %
126
                                 (cluster, result.fail_reason, result.output))
127

    
128
      key_path = utils.PathJoin(self.work_dir, cluster)
129
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
130

    
131
      result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
132
                            private_key=key_path)
133
      if result.failed:
134
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
135
                                 " Fail reason: %s; output: %s" %
136
                                 (cluster, result.fail_reason, result.output))
137
      nodes = result.stdout.splitlines()
138

    
139
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
140
                            private_key=key_path)
141
      if result.failed:
142
        raise errors.RemoteError("Unable to retrieve list of instances from"
143
                                 " %s. Fail reason: %s; output: %s" %
144
                                 (cluster, result.fail_reason, result.output))
145
      instances = result.stdout.splitlines()
146

    
147
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
148

    
149
  def _PrepareAuthorizedKeys(self):
150
    """Prepare the authorized_keys on every merging node.
151

    
152
    This method add our public key to remotes authorized_key for further
153
    communication.
154

    
155
    """
156
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
157
    pub_key = utils.ReadFile(pub_key_file)
158

    
159
    for data in self.merger_data:
160
      for node in data.nodes:
161
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
162
                                     (auth_keys, pub_key)),
163
                              private_key=data.key_path)
164

    
165
        if result.failed:
166
          raise errors.RemoteError("Unable to add our public key to %s in %s."
167
                                   " Fail reason: %s; output: %s" %
168
                                   (node, data.cluster, result.fail_reason,
169
                                    result.output))
170

    
171
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
172
              strict_host_check=False, private_key=None, batch=True,
173
              ask_key=False):
174
    """Wrapping SshRunner.Run with default parameters.
175

    
176
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
177

    
178
    """
179
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
180
                               use_cluster_key=use_cluster_key,
181
                               strict_host_check=strict_host_check,
182
                               private_key=private_key, batch=batch,
183
                               ask_key=ask_key)
184

    
185
  def _StopMergingInstances(self):
186
    """Stop instances on merging clusters.
187

    
188
    """
189
    for cluster in self.clusters:
190
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
191
                                     " --force-multiple")
192

    
193
      if result.failed:
194
        raise errors.RemoteError("Unable to stop instances on %s."
195
                                 " Fail reason: %s; output: %s" %
196
                                 (cluster, result.fail_reason, result.output))
197

    
198
  def _DisableWatcher(self):
199
    """Disable watch on all merging clusters, including ourself.
200

    
201
    """
202
    for cluster in ["localhost"] + self.clusters:
203
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
204
                                     self.pause_period)
205

    
206
      if result.failed:
207
        raise errors.RemoteError("Unable to pause watcher on %s."
208
                                 " Fail reason: %s; output: %s" %
209
                                 (cluster, result.fail_reason, result.output))
210

    
211

    
212
  # R0201: Method could be a function
213
  def _EnableWatcher(self): # pylint: disable-msg=R0201
214
    """Reenable watcher (locally).
215

    
216
    """
217
    result = utils.RunCmd(["gnt-cluster", "watcher", "continue"])
218

    
219
    if result.failed:
220
      logging.warning("Unable to continue watcher. Fail reason: %s;"
221
                      " output: %s", result.fail_reason, result.output)
222

    
223
  def _StopDaemons(self):
224
    """Stop all daemons on merging nodes.
225

    
226
    """
227
    # FIXME: Worth to put this into constants?
228
    cmds = []
229
    for daemon in (constants.RAPI, constants.MASTERD,
230
                   constants.NODED, constants.CONFD):
231
      cmds.append("%s stop %s" % (constants.DAEMON_UTIL, daemon))
232
    for data in self.merger_data:
233
      for node in data.nodes:
234
        result = self._RunCmd(node, " && ".join(cmds))
235

    
236
        if result.failed:
237
          raise errors.RemoteError("Unable to stop daemons on %s."
238
                                   " Fail reason: %s; output: %s." %
239
                                   (node, result.fail_reason, result.output))
240

    
241
  def _FetchRemoteConfig(self):
242
    """Fetches and stores remote cluster config from the master.
243

    
244
    This step is needed before we can merge the config.
245

    
246
    """
247
    for data in self.merger_data:
248
      result = self._RunCmd(data.cluster, "cat %s" %
249
                                          constants.CLUSTER_CONF_FILE)
250

    
251
      if result.failed:
252
        raise errors.RemoteError("Unable to retrieve remote config on %s."
253
                                 " Fail reason: %s; output %s" %
254
                                 (data.cluster, result.fail_reason,
255
                                  result.output))
256

    
257
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
258
                                        data.cluster)
259
      utils.WriteFile(data.config_path, data=result.stdout)
260

    
261
  # R0201: Method could be a function
262
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
263
    """Kills the local master daemon.
264

    
265
    @raise errors.CommandError: If unable to kill
266

    
267
    """
268
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
269
    if result.failed:
270
      raise errors.CommandError("Unable to stop master daemons."
271
                                " Fail reason: %s; output: %s" %
272
                                (result.fail_reason, result.output))
273

    
274
  def _MergeConfig(self):
275
    """Merges all foreign config into our own config.
276

    
277
    """
278
    my_config = config.ConfigWriter(offline=True)
279
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
280

    
281
    for data in self.merger_data:
282
      other_config = config.ConfigWriter(data.config_path)
283

    
284
      for node in other_config.GetNodeList():
285
        node_info = other_config.GetNodeInfo(node)
286
        node_info.master_candidate = False
287
        my_config.AddNode(node_info, str(fake_ec_id))
288
        fake_ec_id += 1
289

    
290
      for instance in other_config.GetInstanceList():
291
        instance_info = other_config.GetInstanceInfo(instance)
292

    
293
        # Update the DRBD port assignments
294
        # This is a little bit hackish
295
        for dsk in instance_info.disks:
296
          if dsk.dev_type in constants.LDS_DRBD:
297
            port = my_config.AllocatePort()
298

    
299
            logical_id = list(dsk.logical_id)
300
            logical_id[2] = port
301
            dsk.logical_id = tuple(logical_id)
302

    
303
            physical_id = list(dsk.physical_id)
304
            physical_id[1] = physical_id[3] = port
305
            dsk.physical_id = tuple(physical_id)
306

    
307
        my_config.AddInstance(instance_info, str(fake_ec_id))
308
        fake_ec_id += 1
309

    
310
  # R0201: Method could be a function
311
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
312
    """Starts the local master daemon.
313

    
314
    @param no_vote: Should the masterd started without voting? default: False
315
    @raise errors.CommandError: If unable to start daemon.
316

    
317
    """
318
    env = {}
319
    if no_vote:
320
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
321

    
322
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
323
    if result.failed:
324
      raise errors.CommandError("Couldn't start ganeti master."
325
                                " Fail reason: %s; output: %s" %
326
                                (result.fail_reason, result.output))
327

    
328
  def _ReaddMergedNodesAndRedist(self):
329
    """Readds all merging nodes and make sure their config is up-to-date.
330

    
331
    @raise errors.CommandError: If anything fails.
332

    
333
    """
334
    for data in self.merger_data:
335
      for node in data.nodes:
336
        result = utils.RunCmd(["gnt-node", "add", "--readd",
337
                               "--no-ssh-key-check", node])
338
        if result.failed:
339
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
340
                                    " output: %s" % (node, result.fail_reason,
341
                                                     result.output))
342

    
343
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
344
    if result.failed:
345
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
346
                                " output: %s" % (result.fail_reason,
347
                                                result.output))
348

    
349
  # R0201: Method could be a function
350
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
351
    """Starts up all instances (locally).
352

    
353
    @raise errors.CommandError: If unable to start clusters
354

    
355
    """
356
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
357
                           "--force-multiple"])
358
    if result.failed:
359
      raise errors.CommandError("Unable to start all instances."
360
                                " Fail reason: %s; output: %s" %
361
                                (result.fail_reason, result.output))
362

    
363
  # R0201: Method could be a function
364
  def _VerifyCluster(self): # pylint: disable-msg=R0201
365
    """Runs gnt-cluster verify to verify the health.
366

    
367
    @raise errors.ProgrammError: If cluster fails on verification
368

    
369
    """
370
    result = utils.RunCmd(["gnt-cluster", "verify"])
371
    if result.failed:
372
      raise errors.CommandError("Verification of cluster failed."
373
                                " Fail reason: %s; output: %s" %
374
                                (result.fail_reason, result.output))
375

    
376
  def Merge(self):
377
    """Does the actual merge.
378

    
379
    It runs all the steps in the right order and updates the user about steps
380
    taken. Also it keeps track of rollback_steps to undo everything.
381

    
382
    """
383
    rbsteps = []
384
    try:
385
      logging.info("Pre cluster verification")
386
      self._VerifyCluster()
387

    
388
      logging.info("Prepare authorized_keys")
389
      rbsteps.append("Remove our key from authorized_keys on nodes:"
390
                     " %(nodes)s")
391
      self._PrepareAuthorizedKeys()
392

    
393
      rbsteps.append("Start all instances again on the merging"
394
                     " clusters: %(clusters)s")
395
      logging.info("Stopping merging instances (takes a while)")
396
      self._StopMergingInstances()
397

    
398
      logging.info("Disable watcher")
399
      self._DisableWatcher()
400
      logging.info("Stop daemons on merging nodes")
401
      self._StopDaemons()
402
      logging.info("Merging config")
403
      self._FetchRemoteConfig()
404
      self._KillMasterDaemon()
405

    
406
      rbsteps.append("Restore %s from another master candidate" %
407
                     constants.CLUSTER_CONF_FILE)
408
      self._MergeConfig()
409
      self._StartMasterDaemon(no_vote=True)
410

    
411
      # Point of no return, delete rbsteps
412
      del rbsteps[:]
413

    
414
      logging.warning("We are at the point of no return. Merge can not easily"
415
                      " be undone after this point.")
416
      logging.info("Readd nodes and redistribute config")
417
      self._ReaddMergedNodesAndRedist()
418
      self._KillMasterDaemon()
419
      self._StartMasterDaemon()
420
      logging.info("Starting instances again")
421
      self._StartupAllInstances()
422
      logging.info("Post cluster verification")
423
      self._VerifyCluster()
424
    except errors.GenericError, e:
425
      logging.exception(e)
426

    
427
      if rbsteps:
428
        nodes = Flatten([data.nodes for data in self.merger_data])
429
        info = {
430
          "clusters": self.clusters,
431
          "nodes": nodes,
432
          }
433
        logging.critical("In order to rollback do the following:")
434
        for step in rbsteps:
435
          logging.critical("  * %s", step % info)
436
      else:
437
        logging.critical("Nothing to rollback.")
438

    
439
      # TODO: Keep track of steps done for a flawless resume?
440

    
441
  def Cleanup(self):
442
    """Clean up our environment.
443

    
444
    This cleans up remote private keys and configs and after that
445
    deletes the temporary directory.
446

    
447
    """
448
    shutil.rmtree(self.work_dir)
449

    
450

    
451
def SetupLogging(options):
452
  """Setting up logging infrastructure.
453

    
454
  @param options: Parsed command line options
455

    
456
  """
457
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
458

    
459
  stderr_handler = logging.StreamHandler()
460
  stderr_handler.setFormatter(formatter)
461
  if options.debug:
462
    stderr_handler.setLevel(logging.NOTSET)
463
  elif options.verbose:
464
    stderr_handler.setLevel(logging.INFO)
465
  else:
466
    stderr_handler.setLevel(logging.ERROR)
467

    
468
  root_logger = logging.getLogger("")
469
  root_logger.setLevel(logging.NOTSET)
470
  root_logger.addHandler(stderr_handler)
471

    
472

    
473
def main():
474
  """Main routine.
475

    
476
  """
477
  program = os.path.basename(sys.argv[0])
478

    
479
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
480
                                        " [--watcher-pause-period SECONDS]"
481
                                        " <cluster> <cluster...>"),
482
                                        prog=program)
483
  parser.add_option(cli.DEBUG_OPT)
484
  parser.add_option(cli.VERBOSE_OPT)
485
  parser.add_option(PAUSE_PERIOD_OPT)
486

    
487
  (options, args) = parser.parse_args()
488

    
489
  SetupLogging(options)
490

    
491
  if not args:
492
    parser.error("No clusters specified")
493

    
494
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period)
495
  try:
496
    try:
497
      cluster_merger.Setup()
498
      cluster_merger.Merge()
499
    except errors.GenericError, e:
500
      logging.exception(e)
501
      return constants.EXIT_FAILURE
502
  finally:
503
    cluster_merger.Cleanup()
504

    
505
  return constants.EXIT_SUCCESS
506

    
507

    
508
if __name__ == "__main__":
509
  sys.exit(main())