Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ e687ec01

History | View | Annotate | Download (29.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
_GROUPS_MERGE = "merge"
46
_GROUPS_RENAME = "rename"
47
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48
_RESTART_ALL = "all"
49
_RESTART_UP = "up"
50
_RESTART_NONE = "none"
51
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52
_PARAMS_STRICT = "strict"
53
_PARAMS_WARN = "warn"
54
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
55

    
56

    
57
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
58
                                  action="store", type="int",
59
                                  dest="pause_period",
60
                                  help=("Amount of time in seconds watcher"
61
                                        " should be suspended from running"))
62
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
63
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
64
                            dest="groups",
65
                            help=("How to handle groups that have the"
66
                                  " same name (One of: %s/%s)" %
67
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
68
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
69
                            metavar="STRATEGY",
70
                            choices=_PARAMS_CHOICES,
71
                            dest="params",
72
                            help=("How to handle params that have"
73
                                  " different values (One of: %s/%s)" %
74
                                  _PARAMS_CHOICES))
75

    
76
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
77
                             metavar="STRATEGY",
78
                             choices=_RESTART_CHOICES,
79
                             dest="restart",
80
                             help=("How to handle restarting instances"
81
                                   " same name (One of: %s/%s/%s)" %
82
                                   _RESTART_CHOICES))
83

    
84
SKIP_STOP_INSTANCES_OPT = \
85
  cli.cli_option("--skip-stop-instances", default=True, action="store_false",
86
                 dest="stop_instances",
87
                 help=("Don't stop the instances on the clusters, just check "
88
                       "that none is running"))
89

    
90

    
91
def Flatten(unflattened_list):
92
  """Flattens a list.
93

    
94
  @param unflattened_list: A list of unflattened list objects.
95
  @return: A flattened list
96

    
97
  """
98
  flattened_list = []
99

    
100
  for item in unflattened_list:
101
    if isinstance(item, list):
102
      flattened_list.extend(Flatten(item))
103
    else:
104
      flattened_list.append(item)
105
  return flattened_list
106

    
107

    
108
class MergerData(object):
109
  """Container class to hold data used for merger.
110

    
111
  """
112
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
113
    """Initialize the container.
114

    
115
    @param cluster: The name of the cluster
116
    @param key_path: Path to the ssh private key used for authentication
117
    @param nodes: List of online nodes in the merging cluster
118
    @param instances: List of instances running on merging cluster
119
    @param config_path: Path to the merging cluster config
120

    
121
    """
122
    self.cluster = cluster
123
    self.key_path = key_path
124
    self.nodes = nodes
125
    self.instances = instances
126
    self.config_path = config_path
127

    
128

    
129
class Merger(object):
130
  """Handling the merge.
131

    
132
  """
133
  RUNNING_STATUSES = frozenset([
134
    constants.INSTST_RUNNING,
135
    constants.INSTST_ERRORUP,
136
    ])
137

    
138
  def __init__(self, clusters, pause_period, groups, restart, params,
139
               stop_instances):
140
    """Initialize object with sane defaults and infos required.
141

    
142
    @param clusters: The list of clusters to merge in
143
    @param pause_period: The time watcher shall be disabled for
144
    @param groups: How to handle group conflicts
145
    @param restart: How to handle instance restart
146
    @param stop_instances: Indicates whether the instances must be stopped
147
                           (True) or if the Merger must only check if no
148
                           instances are running on the mergee clusters (False)
149

    
150
    """
151
    self.merger_data = []
152
    self.clusters = clusters
153
    self.pause_period = pause_period
154
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
155
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
156
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
157
    self.groups = groups
158
    self.restart = restart
159
    self.params = params
160
    self.stop_instances = stop_instances
161
    if self.restart == _RESTART_UP:
162
      raise NotImplementedError
163

    
164
  def Setup(self):
165
    """Sets up our end so we can do the merger.
166

    
167
    This method is setting us up as a preparation for the merger.
168
    It makes the initial contact and gathers information needed.
169

    
170
    @raise errors.RemoteError: for errors in communication/grabbing
171

    
172
    """
173
    (remote_path, _, _) = ssh.GetUserFiles("root")
174

    
175
    if self.cluster_name in self.clusters:
176
      raise errors.CommandError("Cannot merge cluster %s with itself" %
177
                                self.cluster_name)
178

    
179
    # Fetch remotes private key
180
    for cluster in self.clusters:
181
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
182
                            ask_key=False)
183
      if result.failed:
184
        raise errors.RemoteError("There was an error while grabbing ssh private"
185
                                 " key from %s. Fail reason: %s; output: %s" %
186
                                 (cluster, result.fail_reason, result.output))
187

    
188
      key_path = utils.PathJoin(self.work_dir, cluster)
189
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
190

    
191
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
192
                            " --no-header --separator=,", private_key=key_path)
193
      if result.failed:
194
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
195
                                 " Fail reason: %s; output: %s" %
196
                                 (cluster, result.fail_reason, result.output))
197
      nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
198
      nodes = [node_status[0] for node_status in nodes_statuses
199
               if node_status[1] == "N"]
200

    
201
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
202
                            private_key=key_path)
203
      if result.failed:
204
        raise errors.RemoteError("Unable to retrieve list of instances from"
205
                                 " %s. Fail reason: %s; output: %s" %
206
                                 (cluster, result.fail_reason, result.output))
207
      instances = result.stdout.splitlines()
208

    
209
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
210

    
211
  def _PrepareAuthorizedKeys(self):
212
    """Prepare the authorized_keys on every merging node.
213

    
214
    This method add our public key to remotes authorized_key for further
215
    communication.
216

    
217
    """
218
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
219
    pub_key = utils.ReadFile(pub_key_file)
220

    
221
    for data in self.merger_data:
222
      for node in data.nodes:
223
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
224
                                     (auth_keys, pub_key)),
225
                              private_key=data.key_path, max_attempts=3)
226

    
227
        if result.failed:
228
          raise errors.RemoteError("Unable to add our public key to %s in %s."
229
                                   " Fail reason: %s; output: %s" %
230
                                   (node, data.cluster, result.fail_reason,
231
                                    result.output))
232

    
233
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
234
              strict_host_check=False, private_key=None, batch=True,
235
              ask_key=False, max_attempts=1):
236
    """Wrapping SshRunner.Run with default parameters.
237

    
238
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
239

    
240
    """
241
    for _ in range(max_attempts):
242
      result = self.ssh_runner.Run(hostname=hostname, command=command,
243
                                 user=user, use_cluster_key=use_cluster_key,
244
                                 strict_host_check=strict_host_check,
245
                                 private_key=private_key, batch=batch,
246
                                 ask_key=ask_key)
247
      if not result.failed:
248
        break
249

    
250
    return result
251

    
252
  def _CheckRunningInstances(self):
253
    """Checks if on the clusters to be merged there are running instances
254

    
255
    @rtype: boolean
256
    @return: True if there are running instances, False otherwise
257

    
258
    """
259
    for cluster in self.clusters:
260
      result = self._RunCmd(cluster, "gnt-instance list -o status")
261
      if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
262
        return True
263

    
264
    return False
265

    
266
  def _StopMergingInstances(self):
267
    """Stop instances on merging clusters.
268

    
269
    """
270
    for cluster in self.clusters:
271
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
272
                                     " --force-multiple")
273

    
274
      if result.failed:
275
        raise errors.RemoteError("Unable to stop instances on %s."
276
                                 " Fail reason: %s; output: %s" %
277
                                 (cluster, result.fail_reason, result.output))
278

    
279
  def _DisableWatcher(self):
280
    """Disable watch on all merging clusters, including ourself.
281

    
282
    """
283
    for cluster in ["localhost"] + self.clusters:
284
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
285
                                     self.pause_period)
286

    
287
      if result.failed:
288
        raise errors.RemoteError("Unable to pause watcher on %s."
289
                                 " Fail reason: %s; output: %s" %
290
                                 (cluster, result.fail_reason, result.output))
291

    
292
  def _StopDaemons(self):
293
    """Stop all daemons on merging nodes.
294

    
295
    """
296
    cmd = "%s stop-all" % constants.DAEMON_UTIL
297
    for data in self.merger_data:
298
      for node in data.nodes:
299
        result = self._RunCmd(node, cmd, max_attempts=3)
300

    
301
        if result.failed:
302
          raise errors.RemoteError("Unable to stop daemons on %s."
303
                                   " Fail reason: %s; output: %s." %
304
                                   (node, result.fail_reason, result.output))
305

    
306
  def _FetchRemoteConfig(self):
307
    """Fetches and stores remote cluster config from the master.
308

    
309
    This step is needed before we can merge the config.
310

    
311
    """
312
    for data in self.merger_data:
313
      result = self._RunCmd(data.cluster, "cat %s" %
314
                                          constants.CLUSTER_CONF_FILE)
315

    
316
      if result.failed:
317
        raise errors.RemoteError("Unable to retrieve remote config on %s."
318
                                 " Fail reason: %s; output %s" %
319
                                 (data.cluster, result.fail_reason,
320
                                  result.output))
321

    
322
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
323
                                        data.cluster)
324
      utils.WriteFile(data.config_path, data=result.stdout)
325

    
326
  # R0201: Method could be a function
327
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
328
    """Kills the local master daemon.
329

    
330
    @raise errors.CommandError: If unable to kill
331

    
332
    """
333
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
334
    if result.failed:
335
      raise errors.CommandError("Unable to stop master daemons."
336
                                " Fail reason: %s; output: %s" %
337
                                (result.fail_reason, result.output))
338

    
339
  def _MergeConfig(self):
340
    """Merges all foreign config into our own config.
341

    
342
    """
343
    my_config = config.ConfigWriter(offline=True)
344
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
345

    
346
    for data in self.merger_data:
347
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
348
      self._MergeClusterConfigs(my_config, other_config)
349
      self._MergeNodeGroups(my_config, other_config)
350

    
351
      for node in other_config.GetNodeList():
352
        node_info = other_config.GetNodeInfo(node)
353
        # Offline the node, it will be reonlined later at node readd
354
        node_info.master_candidate = False
355
        node_info.drained = False
356
        node_info.offline = True
357
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
358
        fake_ec_id += 1
359

    
360
      for instance in other_config.GetInstanceList():
361
        instance_info = other_config.GetInstanceInfo(instance)
362

    
363
        # Update the DRBD port assignments
364
        # This is a little bit hackish
365
        for dsk in instance_info.disks:
366
          if dsk.dev_type in constants.LDS_DRBD:
367
            port = my_config.AllocatePort()
368

    
369
            logical_id = list(dsk.logical_id)
370
            logical_id[2] = port
371
            dsk.logical_id = tuple(logical_id)
372

    
373
            physical_id = list(dsk.physical_id)
374
            physical_id[1] = physical_id[3] = port
375
            dsk.physical_id = tuple(physical_id)
376

    
377
        my_config.AddInstance(instance_info,
378
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
379
        fake_ec_id += 1
380

    
381
  def _MergeClusterConfigs(self, my_config, other_config):
382
    """Checks that all relevant cluster parameters are compatible
383

    
384
    """
385
    my_cluster = my_config.GetClusterInfo()
386
    other_cluster = other_config.GetClusterInfo()
387
    err_count = 0
388

    
389
    #
390
    # Generic checks
391
    #
392
    check_params = [
393
      "beparams",
394
      "default_iallocator",
395
      "drbd_usermode_helper",
396
      "hidden_os",
397
      "maintain_node_health",
398
      "master_netdev",
399
      "ndparams",
400
      "nicparams",
401
      "primary_ip_family",
402
      "tags",
403
      "uid_pool",
404
      ]
405
    check_params_strict = [
406
      "volume_group_name",
407
    ]
408
    if constants.ENABLE_FILE_STORAGE:
409
      check_params_strict.append("file_storage_dir")
410
    if constants.ENABLE_SHARED_FILE_STORAGE:
411
      check_params_strict.append("shared_file_storage_dir")
412
    check_params.extend(check_params_strict)
413

    
414
    if self.params == _PARAMS_STRICT:
415
      params_strict = True
416
    else:
417
      params_strict = False
418

    
419
    for param_name in check_params:
420
      my_param = getattr(my_cluster, param_name)
421
      other_param = getattr(other_cluster, param_name)
422
      if my_param != other_param:
423
        logging.error("The value (%s) of the cluster parameter %s on %s"
424
                      " differs to this cluster's value (%s)",
425
                      other_param, param_name, other_cluster.cluster_name,
426
                      my_param)
427
        if params_strict or param_name in check_params_strict:
428
          err_count += 1
429

    
430
    #
431
    # Custom checks
432
    #
433

    
434
    # Check default hypervisor
435
    my_defhyp = my_cluster.enabled_hypervisors[0]
436
    other_defhyp = other_cluster.enabled_hypervisors[0]
437
    if my_defhyp != other_defhyp:
438
      logging.warning("The default hypervisor (%s) differs on %s, new"
439
                      " instances will be created with this cluster's"
440
                      " default hypervisor (%s)", other_defhyp,
441
                      other_cluster.cluster_name, my_defhyp)
442

    
443
    if (set(my_cluster.enabled_hypervisors) !=
444
        set(other_cluster.enabled_hypervisors)):
445
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
446
                    " this cluster's set (%s)",
447
                    other_cluster.enabled_hypervisors,
448
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
449
      err_count += 1
450

    
451
    # Check hypervisor params for hypervisors we care about
452
    for hyp in my_cluster.enabled_hypervisors:
453
      for param in my_cluster.hvparams[hyp]:
454
        my_value = my_cluster.hvparams[hyp][param]
455
        other_value = other_cluster.hvparams[hyp][param]
456
        if my_value != other_value:
457
          logging.error("The value (%s) of the %s parameter of the %s"
458
                        " hypervisor on %s differs to this cluster's parameter"
459
                        " (%s)",
460
                        other_value, param, hyp, other_cluster.cluster_name,
461
                        my_value)
462
          if params_strict:
463
            err_count += 1
464

    
465
    # Check os hypervisor params for hypervisors we care about
466
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
467
      for hyp in my_cluster.enabled_hypervisors:
468
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
469
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
470
        if my_os_hvp != other_os_hvp:
471
          logging.error("The OS parameters (%s) for the %s OS for the %s"
472
                        " hypervisor on %s differs to this cluster's parameters"
473
                        " (%s)",
474
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
475
                        my_os_hvp)
476
          if params_strict:
477
            err_count += 1
478

    
479
    #
480
    # Warnings
481
    #
482
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
483
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
484
                      " this cluster's value (%s) will take precedence",
485
                      other_cluster.modify_etc_hosts,
486
                      other_cluster.cluster_name,
487
                      my_cluster.modify_etc_hosts)
488

    
489
    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
490
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
491
                      " this cluster's value (%s) will take precedence",
492
                      other_cluster.modify_ssh_setup,
493
                      other_cluster.cluster_name,
494
                      my_cluster.modify_ssh_setup)
495

    
496
    #
497
    # Actual merging
498
    #
499
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
500
                                       other_cluster.reserved_lvs))
501

    
502
    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
503
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
504
                      " cluster's value (%s). The least permissive value (%s)"
505
                      " will be used", other_cluster.prealloc_wipe_disks,
506
                      other_cluster.cluster_name,
507
                      my_cluster.prealloc_wipe_disks, True)
508
      my_cluster.prealloc_wipe_disks = True
509

    
510
    for os_, osparams in other_cluster.osparams.items():
511
      if os_ not in my_cluster.osparams:
512
        my_cluster.osparams[os_] = osparams
513
      elif my_cluster.osparams[os_] != osparams:
514
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
515
                      " this cluster's parameters (%s)",
516
                      osparams, os_, other_cluster.cluster_name,
517
                      my_cluster.osparams[os_])
518
        if params_strict:
519
          err_count += 1
520

    
521
    if err_count:
522
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
523
                                      " values, please fix and re-run" %
524
                                      other_cluster.cluster_name)
525

    
526
  # R0201: Method could be a function
527
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
528
    if os_name in cluster.os_hvp:
529
      return cluster.os_hvp[os_name].get(hyp, None)
530
    else:
531
      return None
532

    
533
  # R0201: Method could be a function
534
  def _MergeNodeGroups(self, my_config, other_config):
535
    """Adds foreign node groups
536

    
537
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
538
    """
539
    # pylint: disable-msg=R0201
540
    logging.info("Node group conflict strategy: %s", self.groups)
541

    
542
    my_grps = my_config.GetAllNodeGroupsInfo().values()
543
    other_grps = other_config.GetAllNodeGroupsInfo().values()
544

    
545
    # Check for node group naming conflicts:
546
    conflicts = []
547
    for other_grp in other_grps:
548
      for my_grp in my_grps:
549
        if other_grp.name == my_grp.name:
550
          conflicts.append(other_grp)
551

    
552
    if conflicts:
553
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
554
      logging.info("Node groups in both local and remote cluster: %s",
555
                   conflict_names)
556

    
557
      # User hasn't specified how to handle conflicts
558
      if not self.groups:
559
        raise errors.CommandError("The following node group(s) are in both"
560
                                  " clusters, and no merge strategy has been"
561
                                  " supplied (see the --groups option): %s" %
562
                                  conflict_names)
563

    
564
      # User wants to rename conflicts
565
      elif self.groups == _GROUPS_RENAME:
566
        for grp in conflicts:
567
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
568
          logging.info("Renaming remote node group from %s to %s"
569
                       " to resolve conflict", grp.name, new_name)
570
          grp.name = new_name
571

    
572
      # User wants to merge conflicting groups
573
      elif self.groups == _GROUPS_MERGE:
574
        for other_grp in conflicts:
575
          logging.info("Merging local and remote '%s' groups", other_grp.name)
576
          for node_name in other_grp.members[:]:
577
            node = other_config.GetNodeInfo(node_name)
578
            # Access to a protected member of a client class
579
            # pylint: disable-msg=W0212
580
            other_config._UnlockedRemoveNodeFromGroup(node)
581

    
582
            # Access to a protected member of a client class
583
            # pylint: disable-msg=W0212
584
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
585

    
586
            # Access to a protected member of a client class
587
            # pylint: disable-msg=W0212
588
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
589
            node.group = my_grp_uuid
590
          # Remove from list of groups to add
591
          other_grps.remove(other_grp)
592

    
593
    for grp in other_grps:
594
      #TODO: handle node group conflicts
595
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
596

    
597
  # R0201: Method could be a function
598
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
599
    """Starts the local master daemon.
600

    
601
    @param no_vote: Should the masterd started without voting? default: False
602
    @raise errors.CommandError: If unable to start daemon.
603

    
604
    """
605
    env = {}
606
    if no_vote:
607
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
608

    
609
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
610
    if result.failed:
611
      raise errors.CommandError("Couldn't start ganeti master."
612
                                " Fail reason: %s; output: %s" %
613
                                (result.fail_reason, result.output))
614

    
615
  def _ReaddMergedNodesAndRedist(self):
616
    """Readds all merging nodes and make sure their config is up-to-date.
617

    
618
    @raise errors.CommandError: If anything fails.
619

    
620
    """
621
    for data in self.merger_data:
622
      for node in data.nodes:
623
        result = utils.RunCmd(["gnt-node", "add", "--readd",
624
                               "--no-ssh-key-check", "--force-join", node])
625
        if result.failed:
626
          logging.error("%s failed to be readded. Reason: %s, output: %s",
627
                         node, result.fail_reason, result.output)
628

    
629
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
630
    if result.failed:
631
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
632
                                " output: %s" % (result.fail_reason,
633
                                                result.output))
634

    
635
  # R0201: Method could be a function
636
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
637
    """Starts up all instances (locally).
638

    
639
    @raise errors.CommandError: If unable to start clusters
640

    
641
    """
642
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
643
                           "--force-multiple"])
644
    if result.failed:
645
      raise errors.CommandError("Unable to start all instances."
646
                                " Fail reason: %s; output: %s" %
647
                                (result.fail_reason, result.output))
648

    
649
  # R0201: Method could be a function
650
  # TODO: make this overridable, for some verify errors
651
  def _VerifyCluster(self): # pylint: disable-msg=R0201
652
    """Runs gnt-cluster verify to verify the health.
653

    
654
    @raise errors.ProgrammError: If cluster fails on verification
655

    
656
    """
657
    result = utils.RunCmd(["gnt-cluster", "verify"])
658
    if result.failed:
659
      raise errors.CommandError("Verification of cluster failed."
660
                                " Fail reason: %s; output: %s" %
661
                                (result.fail_reason, result.output))
662

    
663
  def Merge(self):
664
    """Does the actual merge.
665

    
666
    It runs all the steps in the right order and updates the user about steps
667
    taken. Also it keeps track of rollback_steps to undo everything.
668

    
669
    """
670
    rbsteps = []
671
    try:
672
      logging.info("Pre cluster verification")
673
      self._VerifyCluster()
674

    
675
      logging.info("Prepare authorized_keys")
676
      rbsteps.append("Remove our key from authorized_keys on nodes:"
677
                     " %(nodes)s")
678
      self._PrepareAuthorizedKeys()
679

    
680
      rbsteps.append("Start all instances again on the merging"
681
                     " clusters: %(clusters)s")
682
      if self.stop_instances:
683
        logging.info("Stopping merging instances (takes a while)")
684
        self._StopMergingInstances()
685
      logging.info("Checking that no instances are running on the mergees")
686
      instances_running = self._CheckRunningInstances()
687
      if instances_running:
688
        raise errors.CommandError("Some instances are still running on the"
689
                                  " mergees")
690
      logging.info("Disable watcher")
691
      self._DisableWatcher()
692
      logging.info("Stop daemons on merging nodes")
693
      self._StopDaemons()
694
      logging.info("Merging config")
695
      self._FetchRemoteConfig()
696

    
697
      logging.info("Stopping master daemon")
698
      self._KillMasterDaemon()
699

    
700
      rbsteps.append("Restore %s from another master candidate"
701
                     " and restart master daemon" %
702
                     constants.CLUSTER_CONF_FILE)
703
      self._MergeConfig()
704
      self._StartMasterDaemon(no_vote=True)
705

    
706
      # Point of no return, delete rbsteps
707
      del rbsteps[:]
708

    
709
      logging.warning("We are at the point of no return. Merge can not easily"
710
                      " be undone after this point.")
711
      logging.info("Readd nodes")
712
      self._ReaddMergedNodesAndRedist()
713

    
714
      logging.info("Merge done, restart master daemon normally")
715
      self._KillMasterDaemon()
716
      self._StartMasterDaemon()
717

    
718
      if self.restart == _RESTART_ALL:
719
        logging.info("Starting instances again")
720
        self._StartupAllInstances()
721
      else:
722
        logging.info("Not starting instances again")
723
      logging.info("Post cluster verification")
724
      self._VerifyCluster()
725
    except errors.GenericError, e:
726
      logging.exception(e)
727

    
728
      if rbsteps:
729
        nodes = Flatten([data.nodes for data in self.merger_data])
730
        info = {
731
          "clusters": self.clusters,
732
          "nodes": nodes,
733
          }
734
        logging.critical("In order to rollback do the following:")
735
        for step in rbsteps:
736
          logging.critical("  * %s", step % info)
737
      else:
738
        logging.critical("Nothing to rollback.")
739

    
740
      # TODO: Keep track of steps done for a flawless resume?
741

    
742
  def Cleanup(self):
743
    """Clean up our environment.
744

    
745
    This cleans up remote private keys and configs and after that
746
    deletes the temporary directory.
747

    
748
    """
749
    shutil.rmtree(self.work_dir)
750

    
751

    
752
def SetupLogging(options):
753
  """Setting up logging infrastructure.
754

    
755
  @param options: Parsed command line options
756

    
757
  """
758
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
759

    
760
  stderr_handler = logging.StreamHandler()
761
  stderr_handler.setFormatter(formatter)
762
  if options.debug:
763
    stderr_handler.setLevel(logging.NOTSET)
764
  elif options.verbose:
765
    stderr_handler.setLevel(logging.INFO)
766
  else:
767
    stderr_handler.setLevel(logging.WARNING)
768

    
769
  root_logger = logging.getLogger("")
770
  root_logger.setLevel(logging.NOTSET)
771
  root_logger.addHandler(stderr_handler)
772

    
773

    
774
def main():
775
  """Main routine.
776

    
777
  """
778
  program = os.path.basename(sys.argv[0])
779

    
780
  parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
781
                                 prog=program)
782
  parser.add_option(cli.DEBUG_OPT)
783
  parser.add_option(cli.VERBOSE_OPT)
784
  parser.add_option(PAUSE_PERIOD_OPT)
785
  parser.add_option(GROUPS_OPT)
786
  parser.add_option(RESTART_OPT)
787
  parser.add_option(PARAMS_OPT)
788
  parser.add_option(SKIP_STOP_INSTANCES_OPT)
789

    
790
  (options, args) = parser.parse_args()
791

    
792
  SetupLogging(options)
793

    
794
  if not args:
795
    parser.error("No clusters specified")
796

    
797
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
798
                          options.groups, options.restart, options.params,
799
                          options.stop_instances)
800
  try:
801
    try:
802
      cluster_merger.Setup()
803
      cluster_merger.Merge()
804
    except errors.GenericError, e:
805
      logging.exception(e)
806
      return constants.EXIT_FAILURE
807
  finally:
808
    cluster_merger.Cleanup()
809

    
810
  return constants.EXIT_SUCCESS
811

    
812

    
813
if __name__ == "__main__":
814
  sys.exit(main())