Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ e1ab08db

History | View | Annotate | Download (16.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
46
                                  action="store", type="int",
47
                                  dest="pause_period",
48
                                  help=("Amount of time in seconds watcher"
49
                                        " should be suspended from running"))
50

    
51

    
52
def Flatten(unflatten_list):
53
  """Flattens a list.
54

    
55
  @param unflatten_list: A list of unflatten list objects.
56
  @return: A flatten list
57

    
58
  """
59
  flatten_list = []
60

    
61
  for item in unflatten_list:
62
    if isinstance(item, list):
63
      flatten_list.extend(Flatten(item))
64
    else:
65
      flatten_list.append(item)
66
  return flatten_list
67

    
68

    
69
class MergerData(object):
70
  """Container class to hold data used for merger.
71

    
72
  """
73
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
74
    """Initialize the container.
75

    
76
    @param cluster: The name of the cluster
77
    @param key_path: Path to the ssh private key used for authentication
78
    @param config_path: Path to the merging cluster config
79
    @param nodes: List of nodes in the merging cluster
80
    @param instances: List of instances running on merging cluster
81

    
82
    """
83
    self.cluster = cluster
84
    self.key_path = key_path
85
    self.config_path = config_path
86
    self.instances = instances
87
    self.nodes = nodes
88

    
89

    
90
class Merger(object):
91
  """Handling the merge.
92

    
93
  """
94
  def __init__(self, clusters, pause_period):
95
    """Initialize object with sane defaults and infos required.
96

    
97
    @param clusters: The list of clusters to merge in
98
    @param pause_period: The time watcher shall be disabled for
99

    
100
    """
101
    self.merger_data = []
102
    self.clusters = clusters
103
    self.pause_period = pause_period
104
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
105
    self.cluster_name = cli.GetClient().QueryConfigValues(["cluster_name"])
106
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
107

    
108
  def Setup(self):
109
    """Sets up our end so we can do the merger.
110

    
111
    This method is setting us up as a preparation for the merger.
112
    It makes the initial contact and gathers information needed.
113

    
114
    @raise errors.RemoteError: for errors in communication/grabbing
115

    
116
    """
117
    (remote_path, _, _) = ssh.GetUserFiles("root")
118

    
119
    # Fetch remotes private key
120
    for cluster in self.clusters:
121
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
122
                            ask_key=False)
123
      if result.failed:
124
        raise errors.RemoteError("There was an error while grabbing ssh private"
125
                                 " key from %s. Fail reason: %s; output: %s" %
126
                                 (cluster, result.fail_reason, result.output))
127

    
128
      key_path = utils.PathJoin(self.work_dir, cluster)
129
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
130

    
131
      result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
132
                            private_key=key_path)
133
      if result.failed:
134
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
135
                                 " Fail reason: %s; output: %s" %
136
                                 (cluster, result.fail_reason, result.output))
137
      nodes = result.stdout.splitlines()
138

    
139
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
140
                            private_key=key_path)
141
      if result.failed:
142
        raise errors.RemoteError("Unable to retrieve list of instances from"
143
                                 " %s. Fail reason: %s; output: %s" %
144
                                 (cluster, result.fail_reason, result.output))
145
      instances = result.stdout.splitlines()
146

    
147
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
148

    
149
  def _PrepareAuthorizedKeys(self):
150
    """Prepare the authorized_keys on every merging node.
151

    
152
    This method add our public key to remotes authorized_key for further
153
    communication.
154

    
155
    """
156
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
157
    pub_key = utils.ReadFile(pub_key_file)
158

    
159
    for data in self.merger_data:
160
      for node in data.nodes:
161
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
162
                                     (auth_keys, pub_key)),
163
                              private_key=data.key_path)
164

    
165
        if result.failed:
166
          raise errors.RemoteError("Unable to add our public key to %s in %s."
167
                                   " Fail reason: %s; output: %s" %
168
                                   (node, data.cluster, result.fail_reason,
169
                                    result.output))
170

    
171
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
172
              strict_host_check=False, private_key=None, batch=True,
173
              ask_key=False):
174
    """Wrapping SshRunner.Run with default parameters.
175

    
176
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
177

    
178
    """
179
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
180
                               use_cluster_key=use_cluster_key,
181
                               strict_host_check=strict_host_check,
182
                               private_key=private_key, batch=batch,
183
                               ask_key=ask_key)
184

    
185
  def _StopMergingInstances(self):
186
    """Stop instances on merging clusters.
187

    
188
    """
189
    for cluster in self.clusters:
190
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
191
                                     " --force-multiple")
192

    
193
      if result.failed:
194
        raise errors.RemoteError("Unable to stop instances on %s."
195
                                 " Fail reason: %s; output: %s" %
196
                                 (cluster, result.fail_reason, result.output))
197

    
198
  def _DisableWatcher(self):
199
    """Disable watch on all merging clusters, including ourself.
200

    
201
    """
202
    for cluster in ["localhost"] + self.clusters:
203
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
204
                                     self.pause_period)
205

    
206
      if result.failed:
207
        raise errors.RemoteError("Unable to pause watcher on %s."
208
                                 " Fail reason: %s; output: %s" %
209
                                 (cluster, result.fail_reason, result.output))
210

    
211
  def _StopDaemons(self):
212
    """Stop all daemons on merging nodes.
213

    
214
    """
215
    cmd = "%s stop-all" % constants.DAEMON_UTIL
216
    for data in self.merger_data:
217
      for node in data.nodes:
218
        result = self._RunCmd(node, cmd)
219

    
220
        if result.failed:
221
          raise errors.RemoteError("Unable to stop daemons on %s."
222
                                   " Fail reason: %s; output: %s." %
223
                                   (node, result.fail_reason, result.output))
224

    
225
  def _FetchRemoteConfig(self):
226
    """Fetches and stores remote cluster config from the master.
227

    
228
    This step is needed before we can merge the config.
229

    
230
    """
231
    for data in self.merger_data:
232
      result = self._RunCmd(data.cluster, "cat %s" %
233
                                          constants.CLUSTER_CONF_FILE)
234

    
235
      if result.failed:
236
        raise errors.RemoteError("Unable to retrieve remote config on %s."
237
                                 " Fail reason: %s; output %s" %
238
                                 (data.cluster, result.fail_reason,
239
                                  result.output))
240

    
241
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
242
                                        data.cluster)
243
      utils.WriteFile(data.config_path, data=result.stdout)
244

    
245
  # R0201: Method could be a function
246
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
247
    """Kills the local master daemon.
248

    
249
    @raise errors.CommandError: If unable to kill
250

    
251
    """
252
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
253
    if result.failed:
254
      raise errors.CommandError("Unable to stop master daemons."
255
                                " Fail reason: %s; output: %s" %
256
                                (result.fail_reason, result.output))
257

    
258
  def _MergeConfig(self):
259
    """Merges all foreign config into our own config.
260

    
261
    """
262
    my_config = config.ConfigWriter(offline=True)
263
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
264

    
265
    for data in self.merger_data:
266
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
267

    
268
      for node in other_config.GetNodeList():
269
        node_info = other_config.GetNodeInfo(node)
270
        node_info.master_candidate = False
271
        my_config.AddNode(node_info, str(fake_ec_id))
272
        fake_ec_id += 1
273

    
274
      for instance in other_config.GetInstanceList():
275
        instance_info = other_config.GetInstanceInfo(instance)
276

    
277
        # Update the DRBD port assignments
278
        # This is a little bit hackish
279
        for dsk in instance_info.disks:
280
          if dsk.dev_type in constants.LDS_DRBD:
281
            port = my_config.AllocatePort()
282

    
283
            logical_id = list(dsk.logical_id)
284
            logical_id[2] = port
285
            dsk.logical_id = tuple(logical_id)
286

    
287
            physical_id = list(dsk.physical_id)
288
            physical_id[1] = physical_id[3] = port
289
            dsk.physical_id = tuple(physical_id)
290

    
291
        my_config.AddInstance(instance_info, str(fake_ec_id))
292
        fake_ec_id += 1
293

    
294
  # R0201: Method could be a function
295
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
296
    """Starts the local master daemon.
297

    
298
    @param no_vote: Should the masterd started without voting? default: False
299
    @raise errors.CommandError: If unable to start daemon.
300

    
301
    """
302
    env = {}
303
    if no_vote:
304
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
305

    
306
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
307
    if result.failed:
308
      raise errors.CommandError("Couldn't start ganeti master."
309
                                " Fail reason: %s; output: %s" %
310
                                (result.fail_reason, result.output))
311

    
312
  def _ReaddMergedNodesAndRedist(self):
313
    """Readds all merging nodes and make sure their config is up-to-date.
314

    
315
    @raise errors.CommandError: If anything fails.
316

    
317
    """
318
    for data in self.merger_data:
319
      for node in data.nodes:
320
        result = utils.RunCmd(["gnt-node", "add", "--readd",
321
                               "--no-ssh-key-check", node])
322
        if result.failed:
323
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
324
                                    " output: %s" % (node, result.fail_reason,
325
                                                     result.output))
326

    
327
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
328
    if result.failed:
329
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
330
                                " output: %s" % (result.fail_reason,
331
                                                result.output))
332

    
333
  # R0201: Method could be a function
334
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
335
    """Starts up all instances (locally).
336

    
337
    @raise errors.CommandError: If unable to start clusters
338

    
339
    """
340
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
341
                           "--force-multiple"])
342
    if result.failed:
343
      raise errors.CommandError("Unable to start all instances."
344
                                " Fail reason: %s; output: %s" %
345
                                (result.fail_reason, result.output))
346

    
347
  # R0201: Method could be a function
348
  def _VerifyCluster(self): # pylint: disable-msg=R0201
349
    """Runs gnt-cluster verify to verify the health.
350

    
351
    @raise errors.ProgrammError: If cluster fails on verification
352

    
353
    """
354
    result = utils.RunCmd(["gnt-cluster", "verify"])
355
    if result.failed:
356
      raise errors.CommandError("Verification of cluster failed."
357
                                " Fail reason: %s; output: %s" %
358
                                (result.fail_reason, result.output))
359

    
360
  def Merge(self):
361
    """Does the actual merge.
362

    
363
    It runs all the steps in the right order and updates the user about steps
364
    taken. Also it keeps track of rollback_steps to undo everything.
365

    
366
    """
367
    rbsteps = []
368
    try:
369
      logging.info("Pre cluster verification")
370
      self._VerifyCluster()
371

    
372
      logging.info("Prepare authorized_keys")
373
      rbsteps.append("Remove our key from authorized_keys on nodes:"
374
                     " %(nodes)s")
375
      self._PrepareAuthorizedKeys()
376

    
377
      rbsteps.append("Start all instances again on the merging"
378
                     " clusters: %(clusters)s")
379
      logging.info("Stopping merging instances (takes a while)")
380
      self._StopMergingInstances()
381

    
382
      logging.info("Disable watcher")
383
      self._DisableWatcher()
384
      logging.info("Stop daemons on merging nodes")
385
      self._StopDaemons()
386
      logging.info("Merging config")
387
      self._FetchRemoteConfig()
388

    
389
      def _OfflineClusterMerge(_):
390
        """Closure run when master daemons stopped
391

    
392
        """
393
        rbsteps.append("Restore %s from another master candidate" %
394
                       constants.CLUSTER_CONF_FILE)
395
        self._MergeConfig()
396
        self._StartMasterDaemon(no_vote=True)
397

    
398
        # Point of no return, delete rbsteps
399
        del rbsteps[:]
400

    
401
        logging.warning("We are at the point of no return. Merge can not easily"
402
                        " be undone after this point.")
403
        logging.info("Readd nodes and redistribute config")
404
        self._ReaddMergedNodesAndRedist()
405
        self._KillMasterDaemon()
406

    
407
      cli.RunWhileClusterStopped(logging.info, _OfflineClusterMerge)
408

    
409
      logging.info("Starting instances again")
410
      self._StartupAllInstances()
411
      logging.info("Post cluster verification")
412
      self._VerifyCluster()
413
    except errors.GenericError, e:
414
      logging.exception(e)
415

    
416
      if rbsteps:
417
        nodes = Flatten([data.nodes for data in self.merger_data])
418
        info = {
419
          "clusters": self.clusters,
420
          "nodes": nodes,
421
          }
422
        logging.critical("In order to rollback do the following:")
423
        for step in rbsteps:
424
          logging.critical("  * %s", step % info)
425
      else:
426
        logging.critical("Nothing to rollback.")
427

    
428
      # TODO: Keep track of steps done for a flawless resume?
429

    
430
  def Cleanup(self):
431
    """Clean up our environment.
432

    
433
    This cleans up remote private keys and configs and after that
434
    deletes the temporary directory.
435

    
436
    """
437
    shutil.rmtree(self.work_dir)
438

    
439

    
440
def SetupLogging(options):
441
  """Setting up logging infrastructure.
442

    
443
  @param options: Parsed command line options
444

    
445
  """
446
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
447

    
448
  stderr_handler = logging.StreamHandler()
449
  stderr_handler.setFormatter(formatter)
450
  if options.debug:
451
    stderr_handler.setLevel(logging.NOTSET)
452
  elif options.verbose:
453
    stderr_handler.setLevel(logging.INFO)
454
  else:
455
    stderr_handler.setLevel(logging.ERROR)
456

    
457
  root_logger = logging.getLogger("")
458
  root_logger.setLevel(logging.NOTSET)
459
  root_logger.addHandler(stderr_handler)
460

    
461

    
462
def main():
463
  """Main routine.
464

    
465
  """
466
  program = os.path.basename(sys.argv[0])
467

    
468
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
469
                                        " [--watcher-pause-period SECONDS]"
470
                                        " <cluster> <cluster...>"),
471
                                        prog=program)
472
  parser.add_option(cli.DEBUG_OPT)
473
  parser.add_option(cli.VERBOSE_OPT)
474
  parser.add_option(PAUSE_PERIOD_OPT)
475

    
476
  (options, args) = parser.parse_args()
477

    
478
  SetupLogging(options)
479

    
480
  if not args:
481
    parser.error("No clusters specified")
482

    
483
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period)
484
  try:
485
    try:
486
      cluster_merger.Setup()
487
      cluster_merger.Merge()
488
    except errors.GenericError, e:
489
      logging.exception(e)
490
      return constants.EXIT_FAILURE
491
  finally:
492
    cluster_merger.Cleanup()
493

    
494
  return constants.EXIT_SUCCESS
495

    
496

    
497
if __name__ == "__main__":
498
  sys.exit(main())