Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ 3d8f154f

History | View | Annotate | Download (17.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
46
                                  action="store", type="int",
47
                                  dest="pause_period",
48
                                  help=("Amount of time in seconds watcher"
49
                                        " should be suspended from running"))
50
_CLUSTERMERGE_ECID = "clustermerge-ecid"
51

    
52

    
53
def Flatten(unflattened_list):
54
  """Flattens a list.
55

    
56
  @param unflattened_list: A list of unflattened list objects.
57
  @return: A flattened list
58

    
59
  """
60
  flattened_list = []
61

    
62
  for item in unflattened_list:
63
    if isinstance(item, list):
64
      flattened_list.extend(Flatten(item))
65
    else:
66
      flattened_list.append(item)
67
  return flattened_list
68

    
69

    
70
class MergerData(object):
71
  """Container class to hold data used for merger.
72

    
73
  """
74
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
75
    """Initialize the container.
76

    
77
    @param cluster: The name of the cluster
78
    @param key_path: Path to the ssh private key used for authentication
79
    @param nodes: List of nodes in the merging cluster
80
    @param instances: List of instances running on merging cluster
81
    @param config_path: Path to the merging cluster config
82

    
83
    """
84
    self.cluster = cluster
85
    self.key_path = key_path
86
    self.nodes = nodes
87
    self.instances = instances
88
    self.config_path = config_path
89

    
90

    
91
class Merger(object):
92
  """Handling the merge.
93

    
94
  """
95
  def __init__(self, clusters, pause_period):
96
    """Initialize object with sane defaults and infos required.
97

    
98
    @param clusters: The list of clusters to merge in
99
    @param pause_period: The time watcher shall be disabled for
100

    
101
    """
102
    self.merger_data = []
103
    self.clusters = clusters
104
    self.pause_period = pause_period
105
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
106
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
107
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
108

    
109
  def Setup(self):
110
    """Sets up our end so we can do the merger.
111

    
112
    This method is setting us up as a preparation for the merger.
113
    It makes the initial contact and gathers information needed.
114

    
115
    @raise errors.RemoteError: for errors in communication/grabbing
116

    
117
    """
118
    (remote_path, _, _) = ssh.GetUserFiles("root")
119

    
120
    if self.cluster_name in self.clusters:
121
      raise errors.CommandError("Cannot merge cluster %s with itself" %
122
                                self.cluster_name)
123

    
124
    # Fetch remotes private key
125
    for cluster in self.clusters:
126
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
127
                            ask_key=False)
128
      if result.failed:
129
        raise errors.RemoteError("There was an error while grabbing ssh private"
130
                                 " key from %s. Fail reason: %s; output: %s" %
131
                                 (cluster, result.fail_reason, result.output))
132

    
133
      key_path = utils.PathJoin(self.work_dir, cluster)
134
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
135

    
136
      result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
137
                            private_key=key_path)
138
      if result.failed:
139
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
140
                                 " Fail reason: %s; output: %s" %
141
                                 (cluster, result.fail_reason, result.output))
142
      nodes = result.stdout.splitlines()
143

    
144
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
145
                            private_key=key_path)
146
      if result.failed:
147
        raise errors.RemoteError("Unable to retrieve list of instances from"
148
                                 " %s. Fail reason: %s; output: %s" %
149
                                 (cluster, result.fail_reason, result.output))
150
      instances = result.stdout.splitlines()
151

    
152
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
153

    
154
  def _PrepareAuthorizedKeys(self):
155
    """Prepare the authorized_keys on every merging node.
156

    
157
    This method add our public key to remotes authorized_key for further
158
    communication.
159

    
160
    """
161
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
162
    pub_key = utils.ReadFile(pub_key_file)
163

    
164
    for data in self.merger_data:
165
      for node in data.nodes:
166
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
167
                                     (auth_keys, pub_key)),
168
                              private_key=data.key_path)
169

    
170
        if result.failed:
171
          raise errors.RemoteError("Unable to add our public key to %s in %s."
172
                                   " Fail reason: %s; output: %s" %
173
                                   (node, data.cluster, result.fail_reason,
174
                                    result.output))
175

    
176
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
177
              strict_host_check=False, private_key=None, batch=True,
178
              ask_key=False):
179
    """Wrapping SshRunner.Run with default parameters.
180

    
181
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
182

    
183
    """
184
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
185
                               use_cluster_key=use_cluster_key,
186
                               strict_host_check=strict_host_check,
187
                               private_key=private_key, batch=batch,
188
                               ask_key=ask_key)
189

    
190
  def _StopMergingInstances(self):
191
    """Stop instances on merging clusters.
192

    
193
    """
194
    for cluster in self.clusters:
195
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
196
                                     " --force-multiple")
197

    
198
      if result.failed:
199
        raise errors.RemoteError("Unable to stop instances on %s."
200
                                 " Fail reason: %s; output: %s" %
201
                                 (cluster, result.fail_reason, result.output))
202

    
203
  def _DisableWatcher(self):
204
    """Disable watch on all merging clusters, including ourself.
205

    
206
    """
207
    for cluster in ["localhost"] + self.clusters:
208
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
209
                                     self.pause_period)
210

    
211
      if result.failed:
212
        raise errors.RemoteError("Unable to pause watcher on %s."
213
                                 " Fail reason: %s; output: %s" %
214
                                 (cluster, result.fail_reason, result.output))
215

    
216
  def _StopDaemons(self):
217
    """Stop all daemons on merging nodes.
218

    
219
    """
220
    cmd = "%s stop-all" % constants.DAEMON_UTIL
221
    for data in self.merger_data:
222
      for node in data.nodes:
223
        result = self._RunCmd(node, cmd)
224

    
225
        if result.failed:
226
          raise errors.RemoteError("Unable to stop daemons on %s."
227
                                   " Fail reason: %s; output: %s." %
228
                                   (node, result.fail_reason, result.output))
229

    
230
  def _FetchRemoteConfig(self):
231
    """Fetches and stores remote cluster config from the master.
232

    
233
    This step is needed before we can merge the config.
234

    
235
    """
236
    for data in self.merger_data:
237
      result = self._RunCmd(data.cluster, "cat %s" %
238
                                          constants.CLUSTER_CONF_FILE)
239

    
240
      if result.failed:
241
        raise errors.RemoteError("Unable to retrieve remote config on %s."
242
                                 " Fail reason: %s; output %s" %
243
                                 (data.cluster, result.fail_reason,
244
                                  result.output))
245

    
246
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
247
                                        data.cluster)
248
      utils.WriteFile(data.config_path, data=result.stdout)
249

    
250
  # R0201: Method could be a function
251
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
252
    """Kills the local master daemon.
253

    
254
    @raise errors.CommandError: If unable to kill
255

    
256
    """
257
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
258
    if result.failed:
259
      raise errors.CommandError("Unable to stop master daemons."
260
                                " Fail reason: %s; output: %s" %
261
                                (result.fail_reason, result.output))
262

    
263
  def _MergeConfig(self):
264
    """Merges all foreign config into our own config.
265

    
266
    """
267
    my_config = config.ConfigWriter(offline=True)
268
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
269

    
270
    for data in self.merger_data:
271
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
272
      self._MergeNodeGroups(my_config, other_config)
273

    
274
      for node in other_config.GetNodeList():
275
        node_info = other_config.GetNodeInfo(node)
276
        node_info.master_candidate = False
277
        my_config.AddNode(node_info, str(fake_ec_id))
278
        fake_ec_id += 1
279

    
280
      for instance in other_config.GetInstanceList():
281
        instance_info = other_config.GetInstanceInfo(instance)
282

    
283
        # Update the DRBD port assignments
284
        # This is a little bit hackish
285
        for dsk in instance_info.disks:
286
          if dsk.dev_type in constants.LDS_DRBD:
287
            port = my_config.AllocatePort()
288

    
289
            logical_id = list(dsk.logical_id)
290
            logical_id[2] = port
291
            dsk.logical_id = tuple(logical_id)
292

    
293
            physical_id = list(dsk.physical_id)
294
            physical_id[1] = physical_id[3] = port
295
            dsk.physical_id = tuple(physical_id)
296

    
297
        my_config.AddInstance(instance_info, str(fake_ec_id))
298
        fake_ec_id += 1
299

    
300
  # R0201: Method could be a function
301
  def _MergeNodeGroups(self, my_config, other_config):
302
    """Adds foreign node groups
303

    
304
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
305
    """
306
    # pylint: disable-msg=R0201
307
    for grp in other_config.GetAllNodeGroupsInfo().values():
308
      #TODO: handle node group conflicts
309
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
310

    
311
  # R0201: Method could be a function
312
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
313
    """Starts the local master daemon.
314

    
315
    @param no_vote: Should the masterd started without voting? default: False
316
    @raise errors.CommandError: If unable to start daemon.
317

    
318
    """
319
    env = {}
320
    if no_vote:
321
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
322

    
323
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
324
    if result.failed:
325
      raise errors.CommandError("Couldn't start ganeti master."
326
                                " Fail reason: %s; output: %s" %
327
                                (result.fail_reason, result.output))
328

    
329
  def _ReaddMergedNodesAndRedist(self):
330
    """Readds all merging nodes and make sure their config is up-to-date.
331

    
332
    @raise errors.CommandError: If anything fails.
333

    
334
    """
335
    for data in self.merger_data:
336
      for node in data.nodes:
337
        result = utils.RunCmd(["gnt-node", "add", "--readd",
338
                               "--no-ssh-key-check", "--force-join", node])
339
        if result.failed:
340
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
341
                                    " output: %s" % (node, result.fail_reason,
342
                                                     result.output))
343

    
344
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
345
    if result.failed:
346
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
347
                                " output: %s" % (result.fail_reason,
348
                                                result.output))
349

    
350
  # R0201: Method could be a function
351
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
352
    """Starts up all instances (locally).
353

    
354
    @raise errors.CommandError: If unable to start clusters
355

    
356
    """
357
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
358
                           "--force-multiple"])
359
    if result.failed:
360
      raise errors.CommandError("Unable to start all instances."
361
                                " Fail reason: %s; output: %s" %
362
                                (result.fail_reason, result.output))
363

    
364
  # R0201: Method could be a function
365
  def _VerifyCluster(self): # pylint: disable-msg=R0201
366
    """Runs gnt-cluster verify to verify the health.
367

    
368
    @raise errors.ProgrammError: If cluster fails on verification
369

    
370
    """
371
    result = utils.RunCmd(["gnt-cluster", "verify"])
372
    if result.failed:
373
      raise errors.CommandError("Verification of cluster failed."
374
                                " Fail reason: %s; output: %s" %
375
                                (result.fail_reason, result.output))
376

    
377
  def Merge(self):
378
    """Does the actual merge.
379

    
380
    It runs all the steps in the right order and updates the user about steps
381
    taken. Also it keeps track of rollback_steps to undo everything.
382

    
383
    """
384
    rbsteps = []
385
    try:
386
      logging.info("Pre cluster verification")
387
      self._VerifyCluster()
388

    
389
      logging.info("Prepare authorized_keys")
390
      rbsteps.append("Remove our key from authorized_keys on nodes:"
391
                     " %(nodes)s")
392
      self._PrepareAuthorizedKeys()
393

    
394
      rbsteps.append("Start all instances again on the merging"
395
                     " clusters: %(clusters)s")
396
      logging.info("Stopping merging instances (takes a while)")
397
      self._StopMergingInstances()
398

    
399
      logging.info("Disable watcher")
400
      self._DisableWatcher()
401
      logging.info("Stop daemons on merging nodes")
402
      self._StopDaemons()
403
      logging.info("Merging config")
404
      self._FetchRemoteConfig()
405

    
406
      logging.info("Stopping master daemon")
407
      self._KillMasterDaemon()
408

    
409
      rbsteps.append("Restore %s from another master candidate"
410
                     " and restart master daemon" %
411
                     constants.CLUSTER_CONF_FILE)
412
      self._MergeConfig()
413
      self._StartMasterDaemon(no_vote=True)
414

    
415
      # Point of no return, delete rbsteps
416
      del rbsteps[:]
417

    
418
      logging.warning("We are at the point of no return. Merge can not easily"
419
                      " be undone after this point.")
420
      logging.info("Readd nodes")
421
      self._ReaddMergedNodesAndRedist()
422

    
423
      logging.info("Merge done, restart master daemon normally")
424
      self._KillMasterDaemon()
425
      self._StartMasterDaemon()
426

    
427
      logging.info("Starting instances again")
428
      self._StartupAllInstances()
429
      logging.info("Post cluster verification")
430
      self._VerifyCluster()
431
    except errors.GenericError, e:
432
      logging.exception(e)
433

    
434
      if rbsteps:
435
        nodes = Flatten([data.nodes for data in self.merger_data])
436
        info = {
437
          "clusters": self.clusters,
438
          "nodes": nodes,
439
          }
440
        logging.critical("In order to rollback do the following:")
441
        for step in rbsteps:
442
          logging.critical("  * %s", step % info)
443
      else:
444
        logging.critical("Nothing to rollback.")
445

    
446
      # TODO: Keep track of steps done for a flawless resume?
447

    
448
  def Cleanup(self):
449
    """Clean up our environment.
450

    
451
    This cleans up remote private keys and configs and after that
452
    deletes the temporary directory.
453

    
454
    """
455
    shutil.rmtree(self.work_dir)
456

    
457

    
458
def SetupLogging(options):
459
  """Setting up logging infrastructure.
460

    
461
  @param options: Parsed command line options
462

    
463
  """
464
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
465

    
466
  stderr_handler = logging.StreamHandler()
467
  stderr_handler.setFormatter(formatter)
468
  if options.debug:
469
    stderr_handler.setLevel(logging.NOTSET)
470
  elif options.verbose:
471
    stderr_handler.setLevel(logging.INFO)
472
  else:
473
    stderr_handler.setLevel(logging.ERROR)
474

    
475
  root_logger = logging.getLogger("")
476
  root_logger.setLevel(logging.NOTSET)
477
  root_logger.addHandler(stderr_handler)
478

    
479

    
480
def main():
481
  """Main routine.
482

    
483
  """
484
  program = os.path.basename(sys.argv[0])
485

    
486
  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
487
                                        " [--watcher-pause-period SECONDS]"
488
                                        " <cluster> <cluster...>"),
489
                                        prog=program)
490
  parser.add_option(cli.DEBUG_OPT)
491
  parser.add_option(cli.VERBOSE_OPT)
492
  parser.add_option(PAUSE_PERIOD_OPT)
493

    
494
  (options, args) = parser.parse_args()
495

    
496
  SetupLogging(options)
497

    
498
  if not args:
499
    parser.error("No clusters specified")
500

    
501
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period)
502
  try:
503
    try:
504
      cluster_merger.Setup()
505
      cluster_merger.Merge()
506
    except errors.GenericError, e:
507
      logging.exception(e)
508
      return constants.EXIT_FAILURE
509
  finally:
510
    cluster_merger.Cleanup()
511

    
512
  return constants.EXIT_SUCCESS
513

    
514

    
515
if __name__ == "__main__":
516
  sys.exit(main())