Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ 1a615be0

History | View | Annotate | Download (19.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
_GROUPS_MERGE = "merge"
46
_GROUPS_RENAME = "rename"
47
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48

    
49
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50
                                  action="store", type="int",
51
                                  dest="pause_period",
52
                                  help=("Amount of time in seconds watcher"
53
                                        " should be suspended from running"))
54
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME), dest="groups",
56
                            help=("How to handle groups that have the"
57
                                  " same name (One of: %s/%s)" %
58
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
59

    
60

    
61
def Flatten(unflattened_list):
62
  """Flattens a list.
63

    
64
  @param unflattened_list: A list of unflattened list objects.
65
  @return: A flattened list
66

    
67
  """
68
  flattened_list = []
69

    
70
  for item in unflattened_list:
71
    if isinstance(item, list):
72
      flattened_list.extend(Flatten(item))
73
    else:
74
      flattened_list.append(item)
75
  return flattened_list
76

    
77

    
78
class MergerData(object):
79
  """Container class to hold data used for merger.
80

    
81
  """
82
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
83
    """Initialize the container.
84

    
85
    @param cluster: The name of the cluster
86
    @param key_path: Path to the ssh private key used for authentication
87
    @param nodes: List of nodes in the merging cluster
88
    @param instances: List of instances running on merging cluster
89
    @param config_path: Path to the merging cluster config
90

    
91
    """
92
    self.cluster = cluster
93
    self.key_path = key_path
94
    self.nodes = nodes
95
    self.instances = instances
96
    self.config_path = config_path
97

    
98

    
99
class Merger(object):
100
  """Handling the merge.
101

    
102
  """
103
  def __init__(self, clusters, pause_period, groups):
104
    """Initialize object with sane defaults and infos required.
105

    
106
    @param clusters: The list of clusters to merge in
107
    @param pause_period: The time watcher shall be disabled for
108
    @param groups: How to handle group conflicts
109

    
110
    """
111
    self.merger_data = []
112
    self.clusters = clusters
113
    self.pause_period = pause_period
114
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
115
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
116
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
117
    self.groups = groups
118

    
119
  def Setup(self):
120
    """Sets up our end so we can do the merger.
121

    
122
    This method is setting us up as a preparation for the merger.
123
    It makes the initial contact and gathers information needed.
124

    
125
    @raise errors.RemoteError: for errors in communication/grabbing
126

    
127
    """
128
    (remote_path, _, _) = ssh.GetUserFiles("root")
129

    
130
    if self.cluster_name in self.clusters:
131
      raise errors.CommandError("Cannot merge cluster %s with itself" %
132
                                self.cluster_name)
133

    
134
    # Fetch remotes private key
135
    for cluster in self.clusters:
136
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
137
                            ask_key=False)
138
      if result.failed:
139
        raise errors.RemoteError("There was an error while grabbing ssh private"
140
                                 " key from %s. Fail reason: %s; output: %s" %
141
                                 (cluster, result.fail_reason, result.output))
142

    
143
      key_path = utils.PathJoin(self.work_dir, cluster)
144
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
145

    
146
      result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
147
                            private_key=key_path)
148
      if result.failed:
149
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
150
                                 " Fail reason: %s; output: %s" %
151
                                 (cluster, result.fail_reason, result.output))
152
      nodes = result.stdout.splitlines()
153

    
154
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
155
                            private_key=key_path)
156
      if result.failed:
157
        raise errors.RemoteError("Unable to retrieve list of instances from"
158
                                 " %s. Fail reason: %s; output: %s" %
159
                                 (cluster, result.fail_reason, result.output))
160
      instances = result.stdout.splitlines()
161

    
162
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
163

    
164
  def _PrepareAuthorizedKeys(self):
165
    """Prepare the authorized_keys on every merging node.
166

    
167
    This method add our public key to remotes authorized_key for further
168
    communication.
169

    
170
    """
171
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
172
    pub_key = utils.ReadFile(pub_key_file)
173

    
174
    for data in self.merger_data:
175
      for node in data.nodes:
176
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
177
                                     (auth_keys, pub_key)),
178
                              private_key=data.key_path)
179

    
180
        if result.failed:
181
          raise errors.RemoteError("Unable to add our public key to %s in %s."
182
                                   " Fail reason: %s; output: %s" %
183
                                   (node, data.cluster, result.fail_reason,
184
                                    result.output))
185

    
186
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
187
              strict_host_check=False, private_key=None, batch=True,
188
              ask_key=False):
189
    """Wrapping SshRunner.Run with default parameters.
190

    
191
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
192

    
193
    """
194
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
195
                               use_cluster_key=use_cluster_key,
196
                               strict_host_check=strict_host_check,
197
                               private_key=private_key, batch=batch,
198
                               ask_key=ask_key)
199

    
200
  def _StopMergingInstances(self):
201
    """Stop instances on merging clusters.
202

    
203
    """
204
    for cluster in self.clusters:
205
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
206
                                     " --force-multiple")
207

    
208
      if result.failed:
209
        raise errors.RemoteError("Unable to stop instances on %s."
210
                                 " Fail reason: %s; output: %s" %
211
                                 (cluster, result.fail_reason, result.output))
212

    
213
  def _DisableWatcher(self):
214
    """Disable watch on all merging clusters, including ourself.
215

    
216
    """
217
    for cluster in ["localhost"] + self.clusters:
218
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
219
                                     self.pause_period)
220

    
221
      if result.failed:
222
        raise errors.RemoteError("Unable to pause watcher on %s."
223
                                 " Fail reason: %s; output: %s" %
224
                                 (cluster, result.fail_reason, result.output))
225

    
226
  def _StopDaemons(self):
227
    """Stop all daemons on merging nodes.
228

    
229
    """
230
    cmd = "%s stop-all" % constants.DAEMON_UTIL
231
    for data in self.merger_data:
232
      for node in data.nodes:
233
        result = self._RunCmd(node, cmd)
234

    
235
        if result.failed:
236
          raise errors.RemoteError("Unable to stop daemons on %s."
237
                                   " Fail reason: %s; output: %s." %
238
                                   (node, result.fail_reason, result.output))
239

    
240
  def _FetchRemoteConfig(self):
241
    """Fetches and stores remote cluster config from the master.
242

    
243
    This step is needed before we can merge the config.
244

    
245
    """
246
    for data in self.merger_data:
247
      result = self._RunCmd(data.cluster, "cat %s" %
248
                                          constants.CLUSTER_CONF_FILE)
249

    
250
      if result.failed:
251
        raise errors.RemoteError("Unable to retrieve remote config on %s."
252
                                 " Fail reason: %s; output %s" %
253
                                 (data.cluster, result.fail_reason,
254
                                  result.output))
255

    
256
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
257
                                        data.cluster)
258
      utils.WriteFile(data.config_path, data=result.stdout)
259

    
260
  # R0201: Method could be a function
261
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
262
    """Kills the local master daemon.
263

    
264
    @raise errors.CommandError: If unable to kill
265

    
266
    """
267
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
268
    if result.failed:
269
      raise errors.CommandError("Unable to stop master daemons."
270
                                " Fail reason: %s; output: %s" %
271
                                (result.fail_reason, result.output))
272

    
273
  def _MergeConfig(self):
274
    """Merges all foreign config into our own config.
275

    
276
    """
277
    my_config = config.ConfigWriter(offline=True)
278
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
279

    
280
    for data in self.merger_data:
281
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
282
      self._MergeNodeGroups(my_config, other_config)
283

    
284
      for node in other_config.GetNodeList():
285
        node_info = other_config.GetNodeInfo(node)
286
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
287
        fake_ec_id += 1
288

    
289
      for instance in other_config.GetInstanceList():
290
        instance_info = other_config.GetInstanceInfo(instance)
291

    
292
        # Update the DRBD port assignments
293
        # This is a little bit hackish
294
        for dsk in instance_info.disks:
295
          if dsk.dev_type in constants.LDS_DRBD:
296
            port = my_config.AllocatePort()
297

    
298
            logical_id = list(dsk.logical_id)
299
            logical_id[2] = port
300
            dsk.logical_id = tuple(logical_id)
301

    
302
            physical_id = list(dsk.physical_id)
303
            physical_id[1] = physical_id[3] = port
304
            dsk.physical_id = tuple(physical_id)
305

    
306
        my_config.AddInstance(instance_info,
307
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
308
        fake_ec_id += 1
309

    
310
  # R0201: Method could be a function
311
  def _MergeNodeGroups(self, my_config, other_config):
312
    """Adds foreign node groups
313

    
314
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
315
    """
316
    # pylint: disable-msg=R0201
317
    logging.info("Node group conflict strategy: %s" % self.groups)
318

    
319
    my_grps = my_config.GetAllNodeGroupsInfo().values()
320
    other_grps = other_config.GetAllNodeGroupsInfo().values()
321

    
322
    # Check for node group naming conflicts:
323
    conflicts = []
324
    for other_grp in other_grps:
325
      for my_grp in my_grps:
326
        if other_grp.name == my_grp.name:
327
          conflicts.append(other_grp)
328

    
329
    if conflicts:
330
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
331
      logging.info("Node groups in both local and remote cluster: %s" %
332
                   conflict_names)
333

    
334
      # User hasn't specified how to handle conflicts
335
      if not self.groups:
336
        raise errors.CommandError("The following node group(s) are in both"
337
                                  " clusters, and no merge strategy has been"
338
                                  " supplied (see the --groups option): %s" %
339
                                  conflict_names)
340

    
341
      # User wants to rename conflicts
342
      if self.groups == _GROUPS_RENAME:
343
        for grp in conflicts:
344
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
345
          logging.info("Renaming remote node group from %s to %s"
346
                       " to resolve conflict" % (grp.name, new_name))
347
          grp.name = new_name
348

    
349
    for grp in other_grps:
350
      #TODO: handle node group conflicts
351
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
352

    
353
  # R0201: Method could be a function
354
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
355
    """Starts the local master daemon.
356

    
357
    @param no_vote: Should the masterd started without voting? default: False
358
    @raise errors.CommandError: If unable to start daemon.
359

    
360
    """
361
    env = {}
362
    if no_vote:
363
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
364

    
365
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
366
    if result.failed:
367
      raise errors.CommandError("Couldn't start ganeti master."
368
                                " Fail reason: %s; output: %s" %
369
                                (result.fail_reason, result.output))
370

    
371
  def _ReaddMergedNodesAndRedist(self):
372
    """Readds all merging nodes and make sure their config is up-to-date.
373

    
374
    @raise errors.CommandError: If anything fails.
375

    
376
    """
377
    for data in self.merger_data:
378
      for node in data.nodes:
379
        result = utils.RunCmd(["gnt-node", "add", "--readd",
380
                               "--no-ssh-key-check", "--force-join", node])
381
        if result.failed:
382
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
383
                                    " output: %s" % (node, result.fail_reason,
384
                                                     result.output))
385

    
386
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
387
    if result.failed:
388
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
389
                                " output: %s" % (result.fail_reason,
390
                                                result.output))
391

    
392
  # R0201: Method could be a function
393
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
394
    """Starts up all instances (locally).
395

    
396
    @raise errors.CommandError: If unable to start clusters
397

    
398
    """
399
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
400
                           "--force-multiple"])
401
    if result.failed:
402
      raise errors.CommandError("Unable to start all instances."
403
                                " Fail reason: %s; output: %s" %
404
                                (result.fail_reason, result.output))
405

    
406
  # R0201: Method could be a function
407
  def _VerifyCluster(self): # pylint: disable-msg=R0201
408
    """Runs gnt-cluster verify to verify the health.
409

    
410
    @raise errors.ProgrammError: If cluster fails on verification
411

    
412
    """
413
    result = utils.RunCmd(["gnt-cluster", "verify"])
414
    if result.failed:
415
      raise errors.CommandError("Verification of cluster failed."
416
                                " Fail reason: %s; output: %s" %
417
                                (result.fail_reason, result.output))
418

    
419
  def Merge(self):
420
    """Does the actual merge.
421

    
422
    It runs all the steps in the right order and updates the user about steps
423
    taken. Also it keeps track of rollback_steps to undo everything.
424

    
425
    """
426
    rbsteps = []
427
    try:
428
      logging.info("Pre cluster verification")
429
      self._VerifyCluster()
430

    
431
      logging.info("Prepare authorized_keys")
432
      rbsteps.append("Remove our key from authorized_keys on nodes:"
433
                     " %(nodes)s")
434
      self._PrepareAuthorizedKeys()
435

    
436
      rbsteps.append("Start all instances again on the merging"
437
                     " clusters: %(clusters)s")
438
      logging.info("Stopping merging instances (takes a while)")
439
      self._StopMergingInstances()
440

    
441
      logging.info("Disable watcher")
442
      self._DisableWatcher()
443
      logging.info("Stop daemons on merging nodes")
444
      self._StopDaemons()
445
      logging.info("Merging config")
446
      self._FetchRemoteConfig()
447

    
448
      logging.info("Stopping master daemon")
449
      self._KillMasterDaemon()
450

    
451
      rbsteps.append("Restore %s from another master candidate"
452
                     " and restart master daemon" %
453
                     constants.CLUSTER_CONF_FILE)
454
      self._MergeConfig()
455
      self._StartMasterDaemon(no_vote=True)
456

    
457
      # Point of no return, delete rbsteps
458
      del rbsteps[:]
459

    
460
      logging.warning("We are at the point of no return. Merge can not easily"
461
                      " be undone after this point.")
462
      logging.info("Readd nodes")
463
      self._ReaddMergedNodesAndRedist()
464

    
465
      logging.info("Merge done, restart master daemon normally")
466
      self._KillMasterDaemon()
467
      self._StartMasterDaemon()
468

    
469
      logging.info("Starting instances again")
470
      self._StartupAllInstances()
471
      logging.info("Post cluster verification")
472
      self._VerifyCluster()
473
    except errors.GenericError, e:
474
      logging.exception(e)
475

    
476
      if rbsteps:
477
        nodes = Flatten([data.nodes for data in self.merger_data])
478
        info = {
479
          "clusters": self.clusters,
480
          "nodes": nodes,
481
          }
482
        logging.critical("In order to rollback do the following:")
483
        for step in rbsteps:
484
          logging.critical("  * %s", step % info)
485
      else:
486
        logging.critical("Nothing to rollback.")
487

    
488
      # TODO: Keep track of steps done for a flawless resume?
489

    
490
  def Cleanup(self):
491
    """Clean up our environment.
492

    
493
    This cleans up remote private keys and configs and after that
494
    deletes the temporary directory.
495

    
496
    """
497
    shutil.rmtree(self.work_dir)
498

    
499

    
500
def SetupLogging(options):
501
  """Setting up logging infrastructure.
502

    
503
  @param options: Parsed command line options
504

    
505
  """
506
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
507

    
508
  stderr_handler = logging.StreamHandler()
509
  stderr_handler.setFormatter(formatter)
510
  if options.debug:
511
    stderr_handler.setLevel(logging.NOTSET)
512
  elif options.verbose:
513
    stderr_handler.setLevel(logging.INFO)
514
  else:
515
    stderr_handler.setLevel(logging.ERROR)
516

    
517
  root_logger = logging.getLogger("")
518
  root_logger.setLevel(logging.NOTSET)
519
  root_logger.addHandler(stderr_handler)
520

    
521

    
522
def main():
523
  """Main routine.
524

    
525
  """
526
  program = os.path.basename(sys.argv[0])
527

    
528
  parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
529
                                        " [--watcher-pause-period SECONDS]"
530
                                        " [--groups [%s|%s]]"
531
                                        " <cluster> [<cluster...>]" %
532
                                        (_GROUPS_MERGE, _GROUPS_RENAME)),
533
                                        prog=program)
534
  parser.add_option(cli.DEBUG_OPT)
535
  parser.add_option(cli.VERBOSE_OPT)
536
  parser.add_option(PAUSE_PERIOD_OPT)
537
  parser.add_option(GROUPS_OPT)
538

    
539
  (options, args) = parser.parse_args()
540

    
541
  SetupLogging(options)
542

    
543
  if not args:
544
    parser.error("No clusters specified")
545

    
546
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
547
                          options.groups)
548
  try:
549
    try:
550
      cluster_merger.Setup()
551
      cluster_merger.Merge()
552
    except errors.GenericError, e:
553
      logging.exception(e)
554
      return constants.EXIT_FAILURE
555
  finally:
556
    cluster_merger.Cleanup()
557

    
558
  return constants.EXIT_SUCCESS
559

    
560

    
561
if __name__ == "__main__":
562
  sys.exit(main())