Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ 66a37e7a

History | View | Annotate | Download (30.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43
from ganeti import pathutils
44
from ganeti import compat
45

    
46

    
47
_GROUPS_MERGE = "merge"
48
_GROUPS_RENAME = "rename"
49
_CLUSTERMERGE_ECID = "clustermerge-ecid"
50
_RESTART_ALL = "all"
51
_RESTART_UP = "up"
52
_RESTART_NONE = "none"
53
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
54
_PARAMS_STRICT = "strict"
55
_PARAMS_WARN = "warn"
56
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
57

    
58

    
59
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
60
                                  action="store", type="int",
61
                                  dest="pause_period",
62
                                  help=("Amount of time in seconds watcher"
63
                                        " should be suspended from running"))
64
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
65
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
66
                            dest="groups",
67
                            help=("How to handle groups that have the"
68
                                  " same name (One of: %s/%s)" %
69
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
70
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
71
                            metavar="STRATEGY",
72
                            choices=_PARAMS_CHOICES,
73
                            dest="params",
74
                            help=("How to handle params that have"
75
                                  " different values (One of: %s/%s)" %
76
                                  _PARAMS_CHOICES))
77

    
78
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
79
                             metavar="STRATEGY",
80
                             choices=_RESTART_CHOICES,
81
                             dest="restart",
82
                             help=("How to handle restarting instances"
83
                                   " same name (One of: %s/%s/%s)" %
84
                                   _RESTART_CHOICES))
85

    
86
SKIP_STOP_INSTANCES_OPT = \
87
  cli.cli_option("--skip-stop-instances", default=True, action="store_false",
88
                 dest="stop_instances",
89
                 help=("Don't stop the instances on the clusters, just check "
90
                       "that none is running"))
91

    
92

    
93
def Flatten(unflattened_list):
94
  """Flattens a list.
95

    
96
  @param unflattened_list: A list of unflattened list objects.
97
  @return: A flattened list
98

    
99
  """
100
  flattened_list = []
101

    
102
  for item in unflattened_list:
103
    if isinstance(item, list):
104
      flattened_list.extend(Flatten(item))
105
    else:
106
      flattened_list.append(item)
107
  return flattened_list
108

    
109

    
110
class MergerData(object):
111
  """Container class to hold data used for merger.
112

    
113
  """
114
  def __init__(self, cluster, key_path, nodes, instances, master_node,
115
               config_path=None):
116
    """Initialize the container.
117

    
118
    @param cluster: The name of the cluster
119
    @param key_path: Path to the ssh private key used for authentication
120
    @param nodes: List of online nodes in the merging cluster
121
    @param instances: List of instances running on merging cluster
122
    @param master_node: Name of the master node
123
    @param config_path: Path to the merging cluster config
124

    
125
    """
126
    self.cluster = cluster
127
    self.key_path = key_path
128
    self.nodes = nodes
129
    self.instances = instances
130
    self.master_node = master_node
131
    self.config_path = config_path
132

    
133

    
134
class Merger(object):
135
  """Handling the merge.
136

    
137
  """
138
  RUNNING_STATUSES = compat.UniqueFrozenset([
139
    constants.INSTST_RUNNING,
140
    constants.INSTST_ERRORUP,
141
    ])
142

    
143
  def __init__(self, clusters, pause_period, groups, restart, params,
144
               stop_instances):
145
    """Initialize object with sane defaults and infos required.
146

    
147
    @param clusters: The list of clusters to merge in
148
    @param pause_period: The time watcher shall be disabled for
149
    @param groups: How to handle group conflicts
150
    @param restart: How to handle instance restart
151
    @param stop_instances: Indicates whether the instances must be stopped
152
                           (True) or if the Merger must only check if no
153
                           instances are running on the mergee clusters (False)
154

    
155
    """
156
    self.merger_data = []
157
    self.clusters = clusters
158
    self.pause_period = pause_period
159
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
160
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
161
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
162
    self.groups = groups
163
    self.restart = restart
164
    self.params = params
165
    self.stop_instances = stop_instances
166
    if self.restart == _RESTART_UP:
167
      raise NotImplementedError
168

    
169
  def Setup(self):
170
    """Sets up our end so we can do the merger.
171

    
172
    This method is setting us up as a preparation for the merger.
173
    It makes the initial contact and gathers information needed.
174

    
175
    @raise errors.RemoteError: for errors in communication/grabbing
176

    
177
    """
178
    (remote_path, _, _) = ssh.GetUserFiles("root")
179

    
180
    if self.cluster_name in self.clusters:
181
      raise errors.CommandError("Cannot merge cluster %s with itself" %
182
                                self.cluster_name)
183

    
184
    # Fetch remotes private key
185
    for cluster in self.clusters:
186
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
187
                            ask_key=False)
188
      if result.failed:
189
        raise errors.RemoteError("There was an error while grabbing ssh private"
190
                                 " key from %s. Fail reason: %s; output: %s" %
191
                                 (cluster, result.fail_reason, result.output))
192

    
193
      key_path = utils.PathJoin(self.work_dir, cluster)
194
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
195

    
196
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
197
                            " --no-headers --separator=,", private_key=key_path)
198
      if result.failed:
199
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
200
                                 " Fail reason: %s; output: %s" %
201
                                 (cluster, result.fail_reason, result.output))
202
      nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
203
      nodes = [node_status[0] for node_status in nodes_statuses
204
               if node_status[1] == "N"]
205

    
206
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
207
                            private_key=key_path)
208
      if result.failed:
209
        raise errors.RemoteError("Unable to retrieve list of instances from"
210
                                 " %s. Fail reason: %s; output: %s" %
211
                                 (cluster, result.fail_reason, result.output))
212
      instances = result.stdout.splitlines()
213

    
214
      path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" %
215
                            constants.SS_MASTER_NODE)
216
      result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
217
      if result.failed:
218
        raise errors.RemoteError("Unable to retrieve the master node name from"
219
                                 " %s. Fail reason: %s; output: %s" %
220
                                 (cluster, result.fail_reason, result.output))
221
      master_node = result.stdout.strip()
222

    
223
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
224
                                         master_node))
225

    
226
  def _PrepareAuthorizedKeys(self):
227
    """Prepare the authorized_keys on every merging node.
228

    
229
    This method add our public key to remotes authorized_key for further
230
    communication.
231

    
232
    """
233
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
234
    pub_key = utils.ReadFile(pub_key_file)
235

    
236
    for data in self.merger_data:
237
      for node in data.nodes:
238
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
239
                                     (auth_keys, pub_key)),
240
                              private_key=data.key_path, max_attempts=3)
241

    
242
        if result.failed:
243
          raise errors.RemoteError("Unable to add our public key to %s in %s."
244
                                   " Fail reason: %s; output: %s" %
245
                                   (node, data.cluster, result.fail_reason,
246
                                    result.output))
247

    
248
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
249
              strict_host_check=False, private_key=None, batch=True,
250
              ask_key=False, max_attempts=1):
251
    """Wrapping SshRunner.Run with default parameters.
252

    
253
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
254

    
255
    """
256
    for _ in range(max_attempts):
257
      result = self.ssh_runner.Run(hostname=hostname, command=command,
258
                                   user=user, use_cluster_key=use_cluster_key,
259
                                   strict_host_check=strict_host_check,
260
                                   private_key=private_key, batch=batch,
261
                                   ask_key=ask_key)
262
      if not result.failed:
263
        break
264

    
265
    return result
266

    
267
  def _CheckRunningInstances(self):
268
    """Checks if on the clusters to be merged there are running instances
269

    
270
    @rtype: boolean
271
    @return: True if there are running instances, False otherwise
272

    
273
    """
274
    for cluster in self.clusters:
275
      result = self._RunCmd(cluster, "gnt-instance list -o status")
276
      if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
277
        return True
278

    
279
    return False
280

    
281
  def _StopMergingInstances(self):
282
    """Stop instances on merging clusters.
283

    
284
    """
285
    for cluster in self.clusters:
286
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
287
                                     " --force-multiple")
288

    
289
      if result.failed:
290
        raise errors.RemoteError("Unable to stop instances on %s."
291
                                 " Fail reason: %s; output: %s" %
292
                                 (cluster, result.fail_reason, result.output))
293

    
294
  def _DisableWatcher(self):
295
    """Disable watch on all merging clusters, including ourself.
296

    
297
    """
298
    for cluster in ["localhost"] + self.clusters:
299
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
300
                                     self.pause_period)
301

    
302
      if result.failed:
303
        raise errors.RemoteError("Unable to pause watcher on %s."
304
                                 " Fail reason: %s; output: %s" %
305
                                 (cluster, result.fail_reason, result.output))
306

    
307
  def _RemoveMasterIps(self):
308
    """Removes the master IPs from the master nodes of each cluster.
309

    
310
    """
311
    for data in self.merger_data:
312
      result = self._RunCmd(data.master_node,
313
                            "gnt-cluster deactivate-master-ip --yes")
314

    
315
      if result.failed:
316
        raise errors.RemoteError("Unable to remove master IP on %s."
317
                                 " Fail reason: %s; output: %s" %
318
                                 (data.master_node,
319
                                  result.fail_reason,
320
                                  result.output))
321

    
322
  def _StopDaemons(self):
323
    """Stop all daemons on merging nodes.
324

    
325
    """
326
    cmd = "%s stop-all" % pathutils.DAEMON_UTIL
327
    for data in self.merger_data:
328
      for node in data.nodes:
329
        result = self._RunCmd(node, cmd, max_attempts=3)
330

    
331
        if result.failed:
332
          raise errors.RemoteError("Unable to stop daemons on %s."
333
                                   " Fail reason: %s; output: %s." %
334
                                   (node, result.fail_reason, result.output))
335

    
336
  def _FetchRemoteConfig(self):
337
    """Fetches and stores remote cluster config from the master.
338

    
339
    This step is needed before we can merge the config.
340

    
341
    """
342
    for data in self.merger_data:
343
      result = self._RunCmd(data.cluster, "cat %s" %
344
                                          pathutils.CLUSTER_CONF_FILE)
345

    
346
      if result.failed:
347
        raise errors.RemoteError("Unable to retrieve remote config on %s."
348
                                 " Fail reason: %s; output %s" %
349
                                 (data.cluster, result.fail_reason,
350
                                  result.output))
351

    
352
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
353
                                        data.cluster)
354
      utils.WriteFile(data.config_path, data=result.stdout)
355

    
356
  # R0201: Method could be a function
357
  def _KillMasterDaemon(self): # pylint: disable=R0201
358
    """Kills the local master daemon.
359

    
360
    @raise errors.CommandError: If unable to kill
361

    
362
    """
363
    result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
364
    if result.failed:
365
      raise errors.CommandError("Unable to stop master daemons."
366
                                " Fail reason: %s; output: %s" %
367
                                (result.fail_reason, result.output))
368

    
369
  def _MergeConfig(self):
370
    """Merges all foreign config into our own config.
371

    
372
    """
373
    my_config = config.ConfigWriter(offline=True)
374
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
375

    
376
    for data in self.merger_data:
377
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
378
      self._MergeClusterConfigs(my_config, other_config)
379
      self._MergeNodeGroups(my_config, other_config)
380

    
381
      for node in other_config.GetNodeList():
382
        node_info = other_config.GetNodeInfo(node)
383
        # Offline the node, it will be reonlined later at node readd
384
        node_info.master_candidate = False
385
        node_info.drained = False
386
        node_info.offline = True
387
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
388
        fake_ec_id += 1
389

    
390
      for instance in other_config.GetInstanceList():
391
        instance_info = other_config.GetInstanceInfo(instance)
392

    
393
        # Update the DRBD port assignments
394
        # This is a little bit hackish
395
        for dsk in instance_info.disks:
396
          if dsk.dev_type in constants.DTS_DRBD:
397
            port = my_config.AllocatePort()
398

    
399
            logical_id = list(dsk.logical_id)
400
            logical_id[2] = port
401
            dsk.logical_id = tuple(logical_id)
402

    
403
            physical_id = list(dsk.physical_id)
404
            physical_id[1] = physical_id[3] = port
405
            dsk.physical_id = tuple(physical_id)
406

    
407
        my_config.AddInstance(instance_info,
408
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
409
        fake_ec_id += 1
410

    
411
  def _MergeClusterConfigs(self, my_config, other_config):
412
    """Checks that all relevant cluster parameters are compatible
413

    
414
    """
415
    my_cluster = my_config.GetClusterInfo()
416
    other_cluster = other_config.GetClusterInfo()
417
    err_count = 0
418

    
419
    #
420
    # Generic checks
421
    #
422
    check_params = [
423
      "beparams",
424
      "default_iallocator",
425
      "drbd_usermode_helper",
426
      "hidden_os",
427
      "maintain_node_health",
428
      "master_netdev",
429
      "ndparams",
430
      "nicparams",
431
      "primary_ip_family",
432
      "tags",
433
      "uid_pool",
434
      ]
435
    check_params_strict = [
436
      "volume_group_name",
437
    ]
438
    if my_cluster.IsFileStorageEnabled() or \
439
        other_cluster.IsFileStorageEnabled():
440
      check_params_strict.append("file_storage_dir")
441
    if my_cluster.IsSharedFileStorageEnabled() or \
442
        other_cluster.IsSharedFileStorageEnabled():
443
      check_params_strict.append("shared_file_storage_dir")
444
    check_params.extend(check_params_strict)
445

    
446
    if self.params == _PARAMS_STRICT:
447
      params_strict = True
448
    else:
449
      params_strict = False
450

    
451
    for param_name in check_params:
452
      my_param = getattr(my_cluster, param_name)
453
      other_param = getattr(other_cluster, param_name)
454
      if my_param != other_param:
455
        logging.error("The value (%s) of the cluster parameter %s on %s"
456
                      " differs to this cluster's value (%s)",
457
                      other_param, param_name, other_cluster.cluster_name,
458
                      my_param)
459
        if params_strict or param_name in check_params_strict:
460
          err_count += 1
461

    
462
    #
463
    # Custom checks
464
    #
465

    
466
    # Check default hypervisor
467
    my_defhyp = my_cluster.enabled_hypervisors[0]
468
    other_defhyp = other_cluster.enabled_hypervisors[0]
469
    if my_defhyp != other_defhyp:
470
      logging.warning("The default hypervisor (%s) differs on %s, new"
471
                      " instances will be created with this cluster's"
472
                      " default hypervisor (%s)", other_defhyp,
473
                      other_cluster.cluster_name, my_defhyp)
474

    
475
    if (set(my_cluster.enabled_hypervisors) !=
476
        set(other_cluster.enabled_hypervisors)):
477
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
478
                    " this cluster's set (%s)",
479
                    other_cluster.enabled_hypervisors,
480
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
481
      err_count += 1
482

    
483
    # Check hypervisor params for hypervisors we care about
484
    for hyp in my_cluster.enabled_hypervisors:
485
      for param in my_cluster.hvparams[hyp]:
486
        my_value = my_cluster.hvparams[hyp][param]
487
        other_value = other_cluster.hvparams[hyp][param]
488
        if my_value != other_value:
489
          logging.error("The value (%s) of the %s parameter of the %s"
490
                        " hypervisor on %s differs to this cluster's parameter"
491
                        " (%s)",
492
                        other_value, param, hyp, other_cluster.cluster_name,
493
                        my_value)
494
          if params_strict:
495
            err_count += 1
496

    
497
    # Check os hypervisor params for hypervisors we care about
498
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
499
      for hyp in my_cluster.enabled_hypervisors:
500
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
501
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
502
        if my_os_hvp != other_os_hvp:
503
          logging.error("The OS parameters (%s) for the %s OS for the %s"
504
                        " hypervisor on %s differs to this cluster's parameters"
505
                        " (%s)",
506
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
507
                        my_os_hvp)
508
          if params_strict:
509
            err_count += 1
510

    
511
    #
512
    # Warnings
513
    #
514
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
515
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
516
                      " this cluster's value (%s) will take precedence",
517
                      other_cluster.modify_etc_hosts,
518
                      other_cluster.cluster_name,
519
                      my_cluster.modify_etc_hosts)
520

    
521
    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
522
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
523
                      " this cluster's value (%s) will take precedence",
524
                      other_cluster.modify_ssh_setup,
525
                      other_cluster.cluster_name,
526
                      my_cluster.modify_ssh_setup)
527

    
528
    #
529
    # Actual merging
530
    #
531
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
532
                                       other_cluster.reserved_lvs))
533

    
534
    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
535
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
536
                      " cluster's value (%s). The least permissive value (%s)"
537
                      " will be used", other_cluster.prealloc_wipe_disks,
538
                      other_cluster.cluster_name,
539
                      my_cluster.prealloc_wipe_disks, True)
540
      my_cluster.prealloc_wipe_disks = True
541

    
542
    for os_, osparams in other_cluster.osparams.items():
543
      if os_ not in my_cluster.osparams:
544
        my_cluster.osparams[os_] = osparams
545
      elif my_cluster.osparams[os_] != osparams:
546
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
547
                      " this cluster's parameters (%s)",
548
                      osparams, os_, other_cluster.cluster_name,
549
                      my_cluster.osparams[os_])
550
        if params_strict:
551
          err_count += 1
552

    
553
    if err_count:
554
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
555
                                      " values, please fix and re-run" %
556
                                      other_cluster.cluster_name)
557

    
558
  # R0201: Method could be a function
559
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
560
    if os_name in cluster.os_hvp:
561
      return cluster.os_hvp[os_name].get(hyp, None)
562
    else:
563
      return None
564

    
565
  # R0201: Method could be a function
566
  def _MergeNodeGroups(self, my_config, other_config):
567
    """Adds foreign node groups
568

    
569
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
570
    """
571
    # pylint: disable=R0201
572
    logging.info("Node group conflict strategy: %s", self.groups)
573

    
574
    my_grps = my_config.GetAllNodeGroupsInfo().values()
575
    other_grps = other_config.GetAllNodeGroupsInfo().values()
576

    
577
    # Check for node group naming conflicts:
578
    conflicts = []
579
    for other_grp in other_grps:
580
      for my_grp in my_grps:
581
        if other_grp.name == my_grp.name:
582
          conflicts.append(other_grp)
583

    
584
    if conflicts:
585
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
586
      logging.info("Node groups in both local and remote cluster: %s",
587
                   conflict_names)
588

    
589
      # User hasn't specified how to handle conflicts
590
      if not self.groups:
591
        raise errors.CommandError("The following node group(s) are in both"
592
                                  " clusters, and no merge strategy has been"
593
                                  " supplied (see the --groups option): %s" %
594
                                  conflict_names)
595

    
596
      # User wants to rename conflicts
597
      elif self.groups == _GROUPS_RENAME:
598
        for grp in conflicts:
599
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
600
          logging.info("Renaming remote node group from %s to %s"
601
                       " to resolve conflict", grp.name, new_name)
602
          grp.name = new_name
603

    
604
      # User wants to merge conflicting groups
605
      elif self.groups == _GROUPS_MERGE:
606
        for other_grp in conflicts:
607
          logging.info("Merging local and remote '%s' groups", other_grp.name)
608
          for node_name in other_grp.members[:]:
609
            node = other_config.GetNodeInfo(node_name)
610
            # Access to a protected member of a client class
611
            # pylint: disable=W0212
612
            other_config._UnlockedRemoveNodeFromGroup(node)
613

    
614
            # Access to a protected member of a client class
615
            # pylint: disable=W0212
616
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
617

    
618
            # Access to a protected member of a client class
619
            # pylint: disable=W0212
620
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
621
            node.group = my_grp_uuid
622
          # Remove from list of groups to add
623
          other_grps.remove(other_grp)
624

    
625
    for grp in other_grps:
626
      #TODO: handle node group conflicts
627
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
628

    
629
  # R0201: Method could be a function
630
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
631
    """Starts the local master daemon.
632

    
633
    @param no_vote: Should the masterd started without voting? default: False
634
    @raise errors.CommandError: If unable to start daemon.
635

    
636
    """
637
    env = {}
638
    if no_vote:
639
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
640

    
641
    result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
642
    if result.failed:
643
      raise errors.CommandError("Couldn't start ganeti master."
644
                                " Fail reason: %s; output: %s" %
645
                                (result.fail_reason, result.output))
646

    
647
  def _ReaddMergedNodesAndRedist(self):
648
    """Readds all merging nodes and make sure their config is up-to-date.
649

    
650
    @raise errors.CommandError: If anything fails.
651

    
652
    """
653
    for data in self.merger_data:
654
      for node in data.nodes:
655
        logging.info("Readding node %s", node)
656
        result = utils.RunCmd(["gnt-node", "add", "--readd",
657
                               "--no-ssh-key-check", node])
658
        if result.failed:
659
          logging.error("%s failed to be readded. Reason: %s, output: %s",
660
                         node, result.fail_reason, result.output)
661

    
662
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
663
    if result.failed:
664
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
665
                                " output: %s" % (result.fail_reason,
666
                                                 result.output))
667

    
668
  # R0201: Method could be a function
669
  def _StartupAllInstances(self): # pylint: disable=R0201
670
    """Starts up all instances (locally).
671

    
672
    @raise errors.CommandError: If unable to start clusters
673

    
674
    """
675
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
676
                           "--force-multiple"])
677
    if result.failed:
678
      raise errors.CommandError("Unable to start all instances."
679
                                " Fail reason: %s; output: %s" %
680
                                (result.fail_reason, result.output))
681

    
682
  # R0201: Method could be a function
683
  # TODO: make this overridable, for some verify errors
684
  def _VerifyCluster(self): # pylint: disable=R0201
685
    """Runs gnt-cluster verify to verify the health.
686

    
687
    @raise errors.ProgrammError: If cluster fails on verification
688

    
689
    """
690
    result = utils.RunCmd(["gnt-cluster", "verify"])
691
    if result.failed:
692
      raise errors.CommandError("Verification of cluster failed."
693
                                " Fail reason: %s; output: %s" %
694
                                (result.fail_reason, result.output))
695

    
696
  def Merge(self):
697
    """Does the actual merge.
698

    
699
    It runs all the steps in the right order and updates the user about steps
700
    taken. Also it keeps track of rollback_steps to undo everything.
701

    
702
    """
703
    rbsteps = []
704
    try:
705
      logging.info("Pre cluster verification")
706
      self._VerifyCluster()
707

    
708
      logging.info("Prepare authorized_keys")
709
      rbsteps.append("Remove our key from authorized_keys on nodes:"
710
                     " %(nodes)s")
711
      self._PrepareAuthorizedKeys()
712

    
713
      rbsteps.append("Start all instances again on the merging"
714
                     " clusters: %(clusters)s")
715
      if self.stop_instances:
716
        logging.info("Stopping merging instances (takes a while)")
717
        self._StopMergingInstances()
718
      logging.info("Checking that no instances are running on the mergees")
719
      instances_running = self._CheckRunningInstances()
720
      if instances_running:
721
        raise errors.CommandError("Some instances are still running on the"
722
                                  " mergees")
723
      logging.info("Disable watcher")
724
      self._DisableWatcher()
725
      logging.info("Merging config")
726
      self._FetchRemoteConfig()
727
      logging.info("Removing master IPs on mergee master nodes")
728
      self._RemoveMasterIps()
729
      logging.info("Stop daemons on merging nodes")
730
      self._StopDaemons()
731

    
732
      logging.info("Stopping master daemon")
733
      self._KillMasterDaemon()
734

    
735
      rbsteps.append("Restore %s from another master candidate"
736
                     " and restart master daemon" %
737
                     pathutils.CLUSTER_CONF_FILE)
738
      self._MergeConfig()
739
      self._StartMasterDaemon(no_vote=True)
740

    
741
      # Point of no return, delete rbsteps
742
      del rbsteps[:]
743

    
744
      logging.warning("We are at the point of no return. Merge can not easily"
745
                      " be undone after this point.")
746
      logging.info("Readd nodes")
747
      self._ReaddMergedNodesAndRedist()
748

    
749
      logging.info("Merge done, restart master daemon normally")
750
      self._KillMasterDaemon()
751
      self._StartMasterDaemon()
752

    
753
      if self.restart == _RESTART_ALL:
754
        logging.info("Starting instances again")
755
        self._StartupAllInstances()
756
      else:
757
        logging.info("Not starting instances again")
758
      logging.info("Post cluster verification")
759
      self._VerifyCluster()
760
    except errors.GenericError, e:
761
      logging.exception(e)
762

    
763
      if rbsteps:
764
        nodes = Flatten([data.nodes for data in self.merger_data])
765
        info = {
766
          "clusters": self.clusters,
767
          "nodes": nodes,
768
          }
769
        logging.critical("In order to rollback do the following:")
770
        for step in rbsteps:
771
          logging.critical("  * %s", step % info)
772
      else:
773
        logging.critical("Nothing to rollback.")
774

    
775
      # TODO: Keep track of steps done for a flawless resume?
776

    
777
  def Cleanup(self):
778
    """Clean up our environment.
779

    
780
    This cleans up remote private keys and configs and after that
781
    deletes the temporary directory.
782

    
783
    """
784
    shutil.rmtree(self.work_dir)
785

    
786

    
787
def main():
788
  """Main routine.
789

    
790
  """
791
  program = os.path.basename(sys.argv[0])
792

    
793
  parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
794
                                 prog=program)
795
  parser.add_option(cli.DEBUG_OPT)
796
  parser.add_option(cli.VERBOSE_OPT)
797
  parser.add_option(PAUSE_PERIOD_OPT)
798
  parser.add_option(GROUPS_OPT)
799
  parser.add_option(RESTART_OPT)
800
  parser.add_option(PARAMS_OPT)
801
  parser.add_option(SKIP_STOP_INSTANCES_OPT)
802

    
803
  (options, args) = parser.parse_args()
804

    
805
  utils.SetupToolLogging(options.debug, options.verbose)
806

    
807
  if not args:
808
    parser.error("No clusters specified")
809

    
810
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
811
                          options.groups, options.restart, options.params,
812
                          options.stop_instances)
813
  try:
814
    try:
815
      cluster_merger.Setup()
816
      cluster_merger.Merge()
817
    except errors.GenericError, e:
818
      logging.exception(e)
819
      return constants.EXIT_FAILURE
820
  finally:
821
    cluster_merger.Cleanup()
822

    
823
  return constants.EXIT_SUCCESS
824

    
825

    
826
if __name__ == "__main__":
827
  sys.exit(main())