Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ fcad7225

History | View | Annotate | Download (27.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
_GROUPS_MERGE = "merge"
46
_GROUPS_RENAME = "rename"
47
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48
_RESTART_ALL = "all"
49
_RESTART_UP = "up"
50
_RESTART_NONE = "none"
51
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52
_PARAMS_STRICT = "strict"
53
_PARAMS_WARN = "warn"
54
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
55

    
56

    
57
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
58
                                  action="store", type="int",
59
                                  dest="pause_period",
60
                                  help=("Amount of time in seconds watcher"
61
                                        " should be suspended from running"))
62
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
63
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
64
                            dest="groups",
65
                            help=("How to handle groups that have the"
66
                                  " same name (One of: %s/%s)" %
67
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
68
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
69
                            metavar="STRATEGY",
70
                            choices=_PARAMS_CHOICES,
71
                            dest="params",
72
                            help=("How to handle params that have"
73
                                  " different values (One of: %s/%s)" %
74
                                  _PARAMS_CHOICES))
75

    
76
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
77
                             metavar="STRATEGY",
78
                             choices=_RESTART_CHOICES,
79
                             dest="restart",
80
                             help=("How to handle restarting instances"
81
                                   " same name (One of: %s/%s/%s)" %
82
                                   _RESTART_CHOICES))
83

    
84

    
85
def Flatten(unflattened_list):
86
  """Flattens a list.
87

    
88
  @param unflattened_list: A list of unflattened list objects.
89
  @return: A flattened list
90

    
91
  """
92
  flattened_list = []
93

    
94
  for item in unflattened_list:
95
    if isinstance(item, list):
96
      flattened_list.extend(Flatten(item))
97
    else:
98
      flattened_list.append(item)
99
  return flattened_list
100

    
101

    
102
class MergerData(object):
103
  """Container class to hold data used for merger.
104

    
105
  """
106
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
107
    """Initialize the container.
108

    
109
    @param cluster: The name of the cluster
110
    @param key_path: Path to the ssh private key used for authentication
111
    @param nodes: List of online nodes in the merging cluster
112
    @param instances: List of instances running on merging cluster
113
    @param config_path: Path to the merging cluster config
114

    
115
    """
116
    self.cluster = cluster
117
    self.key_path = key_path
118
    self.nodes = nodes
119
    self.instances = instances
120
    self.config_path = config_path
121

    
122

    
123
class Merger(object):
124
  """Handling the merge.
125

    
126
  """
127
  def __init__(self, clusters, pause_period, groups, restart, params):
128
    """Initialize object with sane defaults and infos required.
129

    
130
    @param clusters: The list of clusters to merge in
131
    @param pause_period: The time watcher shall be disabled for
132
    @param groups: How to handle group conflicts
133
    @param restart: How to handle instance restart
134

    
135
    """
136
    self.merger_data = []
137
    self.clusters = clusters
138
    self.pause_period = pause_period
139
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
140
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
141
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
142
    self.groups = groups
143
    self.restart = restart
144
    self.params = params
145
    if self.restart == _RESTART_UP:
146
      raise NotImplementedError
147

    
148

    
149
  def Setup(self):
150
    """Sets up our end so we can do the merger.
151

    
152
    This method is setting us up as a preparation for the merger.
153
    It makes the initial contact and gathers information needed.
154

    
155
    @raise errors.RemoteError: for errors in communication/grabbing
156

    
157
    """
158
    (remote_path, _, _) = ssh.GetUserFiles("root")
159

    
160
    if self.cluster_name in self.clusters:
161
      raise errors.CommandError("Cannot merge cluster %s with itself" %
162
                                self.cluster_name)
163

    
164
    # Fetch remotes private key
165
    for cluster in self.clusters:
166
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
167
                            ask_key=False)
168
      if result.failed:
169
        raise errors.RemoteError("There was an error while grabbing ssh private"
170
                                 " key from %s. Fail reason: %s; output: %s" %
171
                                 (cluster, result.fail_reason, result.output))
172

    
173
      key_path = utils.PathJoin(self.work_dir, cluster)
174
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
175

    
176
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
177
                            " --no-header --separator=,", private_key=key_path)
178
      if result.failed:
179
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
180
                                 " Fail reason: %s; output: %s" %
181
                                 (cluster, result.fail_reason, result.output))
182
      nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
183
      nodes = [node_status[0] for node_status in nodes_statuses
184
               if node_status[1] == "N"]
185

    
186
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
187
                            private_key=key_path)
188
      if result.failed:
189
        raise errors.RemoteError("Unable to retrieve list of instances from"
190
                                 " %s. Fail reason: %s; output: %s" %
191
                                 (cluster, result.fail_reason, result.output))
192
      instances = result.stdout.splitlines()
193

    
194
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
195

    
196
  def _PrepareAuthorizedKeys(self):
197
    """Prepare the authorized_keys on every merging node.
198

    
199
    This method add our public key to remotes authorized_key for further
200
    communication.
201

    
202
    """
203
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
204
    pub_key = utils.ReadFile(pub_key_file)
205

    
206
    for data in self.merger_data:
207
      for node in data.nodes:
208
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
209
                                     (auth_keys, pub_key)),
210
                              private_key=data.key_path)
211

    
212
        if result.failed:
213
          raise errors.RemoteError("Unable to add our public key to %s in %s."
214
                                   " Fail reason: %s; output: %s" %
215
                                   (node, data.cluster, result.fail_reason,
216
                                    result.output))
217

    
218
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
219
              strict_host_check=False, private_key=None, batch=True,
220
              ask_key=False):
221
    """Wrapping SshRunner.Run with default parameters.
222

    
223
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
224

    
225
    """
226
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
227
                               use_cluster_key=use_cluster_key,
228
                               strict_host_check=strict_host_check,
229
                               private_key=private_key, batch=batch,
230
                               ask_key=ask_key)
231

    
232
  def _StopMergingInstances(self):
233
    """Stop instances on merging clusters.
234

    
235
    """
236
    for cluster in self.clusters:
237
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
238
                                     " --force-multiple")
239

    
240
      if result.failed:
241
        raise errors.RemoteError("Unable to stop instances on %s."
242
                                 " Fail reason: %s; output: %s" %
243
                                 (cluster, result.fail_reason, result.output))
244

    
245
  def _DisableWatcher(self):
246
    """Disable watch on all merging clusters, including ourself.
247

    
248
    """
249
    for cluster in ["localhost"] + self.clusters:
250
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
251
                                     self.pause_period)
252

    
253
      if result.failed:
254
        raise errors.RemoteError("Unable to pause watcher on %s."
255
                                 " Fail reason: %s; output: %s" %
256
                                 (cluster, result.fail_reason, result.output))
257

    
258
  def _StopDaemons(self):
259
    """Stop all daemons on merging nodes.
260

    
261
    """
262
    cmd = "%s stop-all" % constants.DAEMON_UTIL
263
    for data in self.merger_data:
264
      for node in data.nodes:
265
        result = self._RunCmd(node, cmd)
266

    
267
        if result.failed:
268
          raise errors.RemoteError("Unable to stop daemons on %s."
269
                                   " Fail reason: %s; output: %s." %
270
                                   (node, result.fail_reason, result.output))
271

    
272
  def _FetchRemoteConfig(self):
273
    """Fetches and stores remote cluster config from the master.
274

    
275
    This step is needed before we can merge the config.
276

    
277
    """
278
    for data in self.merger_data:
279
      result = self._RunCmd(data.cluster, "cat %s" %
280
                                          constants.CLUSTER_CONF_FILE)
281

    
282
      if result.failed:
283
        raise errors.RemoteError("Unable to retrieve remote config on %s."
284
                                 " Fail reason: %s; output %s" %
285
                                 (data.cluster, result.fail_reason,
286
                                  result.output))
287

    
288
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
289
                                        data.cluster)
290
      utils.WriteFile(data.config_path, data=result.stdout)
291

    
292
  # R0201: Method could be a function
293
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
294
    """Kills the local master daemon.
295

    
296
    @raise errors.CommandError: If unable to kill
297

    
298
    """
299
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
300
    if result.failed:
301
      raise errors.CommandError("Unable to stop master daemons."
302
                                " Fail reason: %s; output: %s" %
303
                                (result.fail_reason, result.output))
304

    
305
  def _MergeConfig(self):
306
    """Merges all foreign config into our own config.
307

    
308
    """
309
    my_config = config.ConfigWriter(offline=True)
310
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
311

    
312
    for data in self.merger_data:
313
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
314
      self._MergeClusterConfigs(my_config, other_config)
315
      self._MergeNodeGroups(my_config, other_config)
316

    
317
      for node in other_config.GetNodeList():
318
        node_info = other_config.GetNodeInfo(node)
319
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
320
        fake_ec_id += 1
321

    
322
      for instance in other_config.GetInstanceList():
323
        instance_info = other_config.GetInstanceInfo(instance)
324

    
325
        # Update the DRBD port assignments
326
        # This is a little bit hackish
327
        for dsk in instance_info.disks:
328
          if dsk.dev_type in constants.LDS_DRBD:
329
            port = my_config.AllocatePort()
330

    
331
            logical_id = list(dsk.logical_id)
332
            logical_id[2] = port
333
            dsk.logical_id = tuple(logical_id)
334

    
335
            physical_id = list(dsk.physical_id)
336
            physical_id[1] = physical_id[3] = port
337
            dsk.physical_id = tuple(physical_id)
338

    
339
        my_config.AddInstance(instance_info,
340
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
341
        fake_ec_id += 1
342

    
343
  def _MergeClusterConfigs(self, my_config, other_config):
344
    """Checks that all relevant cluster parameters are compatible
345

    
346
    """
347
    my_cluster = my_config.GetClusterInfo()
348
    other_cluster = other_config.GetClusterInfo()
349
    err_count = 0
350

    
351
    #
352
    # Generic checks
353
    #
354
    check_params = [
355
      "beparams",
356
      "default_iallocator",
357
      "drbd_usermode_helper",
358
      "hidden_os",
359
      "maintain_node_health",
360
      "master_netdev",
361
      "ndparams",
362
      "nicparams",
363
      "primary_ip_family",
364
      "tags",
365
      "uid_pool",
366
      ]
367
    check_params_strict = [
368
      "volume_group_name",
369
    ]
370
    if constants.ENABLE_FILE_STORAGE:
371
      check_params_strict.append("file_storage_dir")
372
    if constants.ENABLE_SHARED_FILE_STORAGE:
373
      check_params_strict.append("shared_file_storage_dir")
374
    check_params.extend(check_params_strict)
375

    
376
    if self.params == _PARAMS_STRICT:
377
      params_strict = True
378
    else:
379
      params_strict = False
380

    
381
    for param_name in check_params:
382
      my_param = getattr(my_cluster, param_name)
383
      other_param = getattr(other_cluster, param_name)
384
      if my_param != other_param:
385
        logging.error("The value (%s) of the cluster parameter %s on %s"
386
                      " differs to this cluster's value (%s)",
387
                      other_param, param_name, other_cluster.cluster_name,
388
                      my_param)
389
        if params_strict or param_name in check_params_strict:
390
          err_count += 1
391

    
392
    #
393
    # Custom checks
394
    #
395

    
396
    # Check default hypervisor
397
    my_defhyp = my_cluster.enabled_hypervisors[0]
398
    other_defhyp = other_cluster.enabled_hypervisors[0]
399
    if my_defhyp != other_defhyp:
400
      logging.warning("The default hypervisor (%s) differs on %s, new"
401
                      " instances will be created with this cluster's"
402
                      " default hypervisor (%s)", other_defhyp,
403
                      other_cluster.cluster_name, my_defhyp)
404

    
405
    if (set(my_cluster.enabled_hypervisors) !=
406
        set(other_cluster.enabled_hypervisors)):
407
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
408
                    " this cluster's set (%s)",
409
                    other_cluster.enabled_hypervisors,
410
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
411
      err_count += 1
412

    
413
    # Check hypervisor params for hypervisors we care about
414
    for hyp in my_cluster.enabled_hypervisors:
415
      for param in my_cluster.hvparams[hyp]:
416
        my_value = my_cluster.hvparams[hyp][param]
417
        other_value = other_cluster.hvparams[hyp][param]
418
        if my_value != other_value:
419
          logging.error("The value (%s) of the %s parameter of the %s"
420
                        " hypervisor on %s differs to this cluster's parameter"
421
                        " (%s)",
422
                        other_value, param, hyp, other_cluster.cluster_name,
423
                        my_value)
424
          if params_strict:
425
            err_count += 1
426

    
427
    # Check os hypervisor params for hypervisors we care about
428
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
429
      for hyp in my_cluster.enabled_hypervisors:
430
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
431
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
432
        if my_os_hvp != other_os_hvp:
433
          logging.error("The OS parameters (%s) for the %s OS for the %s"
434
                        " hypervisor on %s differs to this cluster's parameters"
435
                        " (%s)",
436
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
437
                        my_os_hvp)
438
          if params_strict:
439
            err_count += 1
440

    
441
    #
442
    # Warnings
443
    #
444
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
445
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
446
                      " this cluster's value (%s) will take precedence",
447
                      other_cluster.modify_etc_hosts,
448
                      other_cluster.cluster_name,
449
                      my_cluster.modify_etc_hosts)
450

    
451
    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
452
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
453
                      " this cluster's value (%s) will take precedence",
454
                      other_cluster.modify_ssh_setup,
455
                      other_cluster.cluster_name,
456
                      my_cluster.modify_ssh_setup)
457

    
458
    #
459
    # Actual merging
460
    #
461
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
462
                                       other_cluster.reserved_lvs))
463

    
464
    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
465
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
466
                      " cluster's value (%s). The least permissive value (%s)"
467
                      " will be used", other_cluster.prealloc_wipe_disks,
468
                      other_cluster.cluster_name,
469
                      my_cluster.prealloc_wipe_disks, True)
470
      my_cluster.prealloc_wipe_disks = True
471

    
472
    for os_, osparams in other_cluster.osparams.items():
473
      if os_ not in my_cluster.osparams:
474
        my_cluster.osparams[os_] = osparams
475
      elif my_cluster.osparams[os_] != osparams:
476
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
477
                      " this cluster's parameters (%s)",
478
                      osparams, os_, other_cluster.cluster_name,
479
                      my_cluster.osparams[os_])
480
        if params_strict:
481
          err_count += 1
482

    
483
    if err_count:
484
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
485
                                      " values, please fix and re-run" %
486
                                      other_cluster.cluster_name)
487

    
488
  # R0201: Method could be a function
489
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
490
    if os_name in cluster.os_hvp:
491
      return cluster.os_hvp[os_name].get(hyp, None)
492
    else:
493
      return None
494

    
495
  # R0201: Method could be a function
496
  def _MergeNodeGroups(self, my_config, other_config):
497
    """Adds foreign node groups
498

    
499
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
500
    """
501
    # pylint: disable-msg=R0201
502
    logging.info("Node group conflict strategy: %s", self.groups)
503

    
504
    my_grps = my_config.GetAllNodeGroupsInfo().values()
505
    other_grps = other_config.GetAllNodeGroupsInfo().values()
506

    
507
    # Check for node group naming conflicts:
508
    conflicts = []
509
    for other_grp in other_grps:
510
      for my_grp in my_grps:
511
        if other_grp.name == my_grp.name:
512
          conflicts.append(other_grp)
513

    
514
    if conflicts:
515
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
516
      logging.info("Node groups in both local and remote cluster: %s",
517
                   conflict_names)
518

    
519
      # User hasn't specified how to handle conflicts
520
      if not self.groups:
521
        raise errors.CommandError("The following node group(s) are in both"
522
                                  " clusters, and no merge strategy has been"
523
                                  " supplied (see the --groups option): %s" %
524
                                  conflict_names)
525

    
526
      # User wants to rename conflicts
527
      elif self.groups == _GROUPS_RENAME:
528
        for grp in conflicts:
529
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
530
          logging.info("Renaming remote node group from %s to %s"
531
                       " to resolve conflict", grp.name, new_name)
532
          grp.name = new_name
533

    
534
      # User wants to merge conflicting groups
535
      elif self.groups == _GROUPS_MERGE:
536
        for other_grp in conflicts:
537
          logging.info("Merging local and remote '%s' groups", other_grp.name)
538
          for node_name in other_grp.members[:]:
539
            node = other_config.GetNodeInfo(node_name)
540
            # Access to a protected member of a client class
541
            # pylint: disable-msg=W0212
542
            other_config._UnlockedRemoveNodeFromGroup(node)
543

    
544
            # Access to a protected member of a client class
545
            # pylint: disable-msg=W0212
546
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
547

    
548
            # Access to a protected member of a client class
549
            # pylint: disable-msg=W0212
550
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
551
            node.group = my_grp_uuid
552
          # Remove from list of groups to add
553
          other_grps.remove(other_grp)
554

    
555
    for grp in other_grps:
556
      #TODO: handle node group conflicts
557
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
558

    
559
  # R0201: Method could be a function
560
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
561
    """Starts the local master daemon.
562

    
563
    @param no_vote: Should the masterd started without voting? default: False
564
    @raise errors.CommandError: If unable to start daemon.
565

    
566
    """
567
    env = {}
568
    if no_vote:
569
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
570

    
571
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
572
    if result.failed:
573
      raise errors.CommandError("Couldn't start ganeti master."
574
                                " Fail reason: %s; output: %s" %
575
                                (result.fail_reason, result.output))
576

    
577
  def _ReaddMergedNodesAndRedist(self):
578
    """Readds all merging nodes and make sure their config is up-to-date.
579

    
580
    @raise errors.CommandError: If anything fails.
581

    
582
    """
583
    for data in self.merger_data:
584
      for node in data.nodes:
585
        result = utils.RunCmd(["gnt-node", "add", "--readd",
586
                               "--no-ssh-key-check", "--force-join", node])
587
        if result.failed:
588
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
589
                                    " output: %s" % (node, result.fail_reason,
590
                                                     result.output))
591

    
592
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
593
    if result.failed:
594
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
595
                                " output: %s" % (result.fail_reason,
596
                                                result.output))
597

    
598
  # R0201: Method could be a function
599
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
600
    """Starts up all instances (locally).
601

    
602
    @raise errors.CommandError: If unable to start clusters
603

    
604
    """
605
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
606
                           "--force-multiple"])
607
    if result.failed:
608
      raise errors.CommandError("Unable to start all instances."
609
                                " Fail reason: %s; output: %s" %
610
                                (result.fail_reason, result.output))
611

    
612
  # R0201: Method could be a function
613
  def _VerifyCluster(self): # pylint: disable-msg=R0201
614
    """Runs gnt-cluster verify to verify the health.
615

    
616
    @raise errors.ProgrammError: If cluster fails on verification
617

    
618
    """
619
    result = utils.RunCmd(["gnt-cluster", "verify"])
620
    if result.failed:
621
      raise errors.CommandError("Verification of cluster failed."
622
                                " Fail reason: %s; output: %s" %
623
                                (result.fail_reason, result.output))
624

    
625
  def Merge(self):
626
    """Does the actual merge.
627

    
628
    It runs all the steps in the right order and updates the user about steps
629
    taken. Also it keeps track of rollback_steps to undo everything.
630

    
631
    """
632
    rbsteps = []
633
    try:
634
      logging.info("Pre cluster verification")
635
      self._VerifyCluster()
636

    
637
      logging.info("Prepare authorized_keys")
638
      rbsteps.append("Remove our key from authorized_keys on nodes:"
639
                     " %(nodes)s")
640
      self._PrepareAuthorizedKeys()
641

    
642
      rbsteps.append("Start all instances again on the merging"
643
                     " clusters: %(clusters)s")
644
      logging.info("Stopping merging instances (takes a while)")
645
      self._StopMergingInstances()
646

    
647
      logging.info("Disable watcher")
648
      self._DisableWatcher()
649
      logging.info("Stop daemons on merging nodes")
650
      self._StopDaemons()
651
      logging.info("Merging config")
652
      self._FetchRemoteConfig()
653

    
654
      logging.info("Stopping master daemon")
655
      self._KillMasterDaemon()
656

    
657
      rbsteps.append("Restore %s from another master candidate"
658
                     " and restart master daemon" %
659
                     constants.CLUSTER_CONF_FILE)
660
      self._MergeConfig()
661
      self._StartMasterDaemon(no_vote=True)
662

    
663
      # Point of no return, delete rbsteps
664
      del rbsteps[:]
665

    
666
      logging.warning("We are at the point of no return. Merge can not easily"
667
                      " be undone after this point.")
668
      logging.info("Readd nodes")
669
      self._ReaddMergedNodesAndRedist()
670

    
671
      logging.info("Merge done, restart master daemon normally")
672
      self._KillMasterDaemon()
673
      self._StartMasterDaemon()
674

    
675
      if self.restart == _RESTART_ALL:
676
        logging.info("Starting instances again")
677
        self._StartupAllInstances()
678
      else:
679
        logging.info("Not starting instances again")
680
      logging.info("Post cluster verification")
681
      self._VerifyCluster()
682
    except errors.GenericError, e:
683
      logging.exception(e)
684

    
685
      if rbsteps:
686
        nodes = Flatten([data.nodes for data in self.merger_data])
687
        info = {
688
          "clusters": self.clusters,
689
          "nodes": nodes,
690
          }
691
        logging.critical("In order to rollback do the following:")
692
        for step in rbsteps:
693
          logging.critical("  * %s", step % info)
694
      else:
695
        logging.critical("Nothing to rollback.")
696

    
697
      # TODO: Keep track of steps done for a flawless resume?
698

    
699
  def Cleanup(self):
700
    """Clean up our environment.
701

    
702
    This cleans up remote private keys and configs and after that
703
    deletes the temporary directory.
704

    
705
    """
706
    shutil.rmtree(self.work_dir)
707

    
708

    
709
def SetupLogging(options):
710
  """Setting up logging infrastructure.
711

    
712
  @param options: Parsed command line options
713

    
714
  """
715
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
716

    
717
  stderr_handler = logging.StreamHandler()
718
  stderr_handler.setFormatter(formatter)
719
  if options.debug:
720
    stderr_handler.setLevel(logging.NOTSET)
721
  elif options.verbose:
722
    stderr_handler.setLevel(logging.INFO)
723
  else:
724
    stderr_handler.setLevel(logging.WARNING)
725

    
726
  root_logger = logging.getLogger("")
727
  root_logger.setLevel(logging.NOTSET)
728
  root_logger.addHandler(stderr_handler)
729

    
730

    
731
def main():
732
  """Main routine.
733

    
734
  """
735
  program = os.path.basename(sys.argv[0])
736

    
737
  parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
738
                                 prog=program)
739
  parser.add_option(cli.DEBUG_OPT)
740
  parser.add_option(cli.VERBOSE_OPT)
741
  parser.add_option(PAUSE_PERIOD_OPT)
742
  parser.add_option(GROUPS_OPT)
743
  parser.add_option(RESTART_OPT)
744
  parser.add_option(PARAMS_OPT)
745

    
746
  (options, args) = parser.parse_args()
747

    
748
  SetupLogging(options)
749

    
750
  if not args:
751
    parser.error("No clusters specified")
752

    
753
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
754
                          options.groups, options.restart, options.params)
755
  try:
756
    try:
757
      cluster_merger.Setup()
758
      cluster_merger.Merge()
759
    except errors.GenericError, e:
760
      logging.exception(e)
761
      return constants.EXIT_FAILURE
762
  finally:
763
    cluster_merger.Cleanup()
764

    
765
  return constants.EXIT_SUCCESS
766

    
767

    
768
if __name__ == "__main__":
769
  sys.exit(main())