Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ 07ff0a78

History | View | Annotate | Download (27.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
_GROUPS_MERGE = "merge"
46
_GROUPS_RENAME = "rename"
47
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48
_RESTART_ALL = "all"
49
_RESTART_UP = "up"
50
_RESTART_NONE = "none"
51
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52
_PARAMS_STRICT = "strict"
53
_PARAMS_WARN = "warn"
54
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
55

    
56

    
57
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
58
                                  action="store", type="int",
59
                                  dest="pause_period",
60
                                  help=("Amount of time in seconds watcher"
61
                                        " should be suspended from running"))
62
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
63
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
64
                            dest="groups",
65
                            help=("How to handle groups that have the"
66
                                  " same name (One of: %s/%s)" %
67
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
68
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
69
                            metavar="STRATEGY",
70
                            choices=_PARAMS_CHOICES,
71
                            dest="params",
72
                            help=("How to handle params that have"
73
                                  " different values (One of: %s/%s)" %
74
                                  _PARAMS_CHOICES))
75

    
76
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
77
                             metavar="STRATEGY",
78
                             choices=_RESTART_CHOICES,
79
                             dest="restart",
80
                             help=("How to handle restarting instances"
81
                                   " same name (One of: %s/%s/%s)" %
82
                                   _RESTART_CHOICES))
83

    
84

    
85
def Flatten(unflattened_list):
86
  """Flattens a list.
87

    
88
  @param unflattened_list: A list of unflattened list objects.
89
  @return: A flattened list
90

    
91
  """
92
  flattened_list = []
93

    
94
  for item in unflattened_list:
95
    if isinstance(item, list):
96
      flattened_list.extend(Flatten(item))
97
    else:
98
      flattened_list.append(item)
99
  return flattened_list
100

    
101

    
102
class MergerData(object):
103
  """Container class to hold data used for merger.
104

    
105
  """
106
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
107
    """Initialize the container.
108

    
109
    @param cluster: The name of the cluster
110
    @param key_path: Path to the ssh private key used for authentication
111
    @param nodes: List of online nodes in the merging cluster
112
    @param instances: List of instances running on merging cluster
113
    @param config_path: Path to the merging cluster config
114

    
115
    """
116
    self.cluster = cluster
117
    self.key_path = key_path
118
    self.nodes = nodes
119
    self.instances = instances
120
    self.config_path = config_path
121

    
122

    
123
class Merger(object):
124
  """Handling the merge.
125

    
126
  """
127
  def __init__(self, clusters, pause_period, groups, restart, params):
128
    """Initialize object with sane defaults and infos required.
129

    
130
    @param clusters: The list of clusters to merge in
131
    @param pause_period: The time watcher shall be disabled for
132
    @param groups: How to handle group conflicts
133
    @param restart: How to handle instance restart
134

    
135
    """
136
    self.merger_data = []
137
    self.clusters = clusters
138
    self.pause_period = pause_period
139
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
140
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
141
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
142
    self.groups = groups
143
    self.restart = restart
144
    self.params = params
145
    if self.restart == _RESTART_UP:
146
      raise NotImplementedError
147

    
148

    
149
  def Setup(self):
150
    """Sets up our end so we can do the merger.
151

    
152
    This method is setting us up as a preparation for the merger.
153
    It makes the initial contact and gathers information needed.
154

    
155
    @raise errors.RemoteError: for errors in communication/grabbing
156

    
157
    """
158
    (remote_path, _, _) = ssh.GetUserFiles("root")
159

    
160
    if self.cluster_name in self.clusters:
161
      raise errors.CommandError("Cannot merge cluster %s with itself" %
162
                                self.cluster_name)
163

    
164
    # Fetch remotes private key
165
    for cluster in self.clusters:
166
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
167
                            ask_key=False)
168
      if result.failed:
169
        raise errors.RemoteError("There was an error while grabbing ssh private"
170
                                 " key from %s. Fail reason: %s; output: %s" %
171
                                 (cluster, result.fail_reason, result.output))
172

    
173
      key_path = utils.PathJoin(self.work_dir, cluster)
174
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
175

    
176
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
177
                            " --no-header --separator=,", private_key=key_path)
178
      if result.failed:
179
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
180
                                 " Fail reason: %s; output: %s" %
181
                                 (cluster, result.fail_reason, result.output))
182
      nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
183
      nodes = [node_status[0] for node_status in nodes_statuses
184
               if node_status[1] == "N"]
185

    
186
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
187
                            private_key=key_path)
188
      if result.failed:
189
        raise errors.RemoteError("Unable to retrieve list of instances from"
190
                                 " %s. Fail reason: %s; output: %s" %
191
                                 (cluster, result.fail_reason, result.output))
192
      instances = result.stdout.splitlines()
193

    
194
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
195

    
196
  def _PrepareAuthorizedKeys(self):
197
    """Prepare the authorized_keys on every merging node.
198

    
199
    This method add our public key to remotes authorized_key for further
200
    communication.
201

    
202
    """
203
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
204
    pub_key = utils.ReadFile(pub_key_file)
205

    
206
    for data in self.merger_data:
207
      for node in data.nodes:
208
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
209
                                     (auth_keys, pub_key)),
210
                              private_key=data.key_path)
211

    
212
        if result.failed:
213
          raise errors.RemoteError("Unable to add our public key to %s in %s."
214
                                   " Fail reason: %s; output: %s" %
215
                                   (node, data.cluster, result.fail_reason,
216
                                    result.output))
217

    
218
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
219
              strict_host_check=False, private_key=None, batch=True,
220
              ask_key=False, max_attempts=1):
221
    """Wrapping SshRunner.Run with default parameters.
222

    
223
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
224

    
225
    """
226
    for _ in range(max_attempts):
227
      result = self.ssh_runner.Run(hostname=hostname, command=command,
228
                                 user=user, use_cluster_key=use_cluster_key,
229
                                 strict_host_check=strict_host_check,
230
                                 private_key=private_key, batch=batch,
231
                                 ask_key=ask_key)
232
      if not result.failed:
233
        break
234

    
235
    return result
236

    
237
  def _StopMergingInstances(self):
238
    """Stop instances on merging clusters.
239

    
240
    """
241
    for cluster in self.clusters:
242
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
243
                                     " --force-multiple")
244

    
245
      if result.failed:
246
        raise errors.RemoteError("Unable to stop instances on %s."
247
                                 " Fail reason: %s; output: %s" %
248
                                 (cluster, result.fail_reason, result.output))
249

    
250
  def _DisableWatcher(self):
251
    """Disable watch on all merging clusters, including ourself.
252

    
253
    """
254
    for cluster in ["localhost"] + self.clusters:
255
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
256
                                     self.pause_period)
257

    
258
      if result.failed:
259
        raise errors.RemoteError("Unable to pause watcher on %s."
260
                                 " Fail reason: %s; output: %s" %
261
                                 (cluster, result.fail_reason, result.output))
262

    
263
  def _StopDaemons(self):
264
    """Stop all daemons on merging nodes.
265

    
266
    """
267
    cmd = "%s stop-all" % constants.DAEMON_UTIL
268
    for data in self.merger_data:
269
      for node in data.nodes:
270
        result = self._RunCmd(node, cmd)
271

    
272
        if result.failed:
273
          raise errors.RemoteError("Unable to stop daemons on %s."
274
                                   " Fail reason: %s; output: %s." %
275
                                   (node, result.fail_reason, result.output))
276

    
277
  def _FetchRemoteConfig(self):
278
    """Fetches and stores remote cluster config from the master.
279

    
280
    This step is needed before we can merge the config.
281

    
282
    """
283
    for data in self.merger_data:
284
      result = self._RunCmd(data.cluster, "cat %s" %
285
                                          constants.CLUSTER_CONF_FILE)
286

    
287
      if result.failed:
288
        raise errors.RemoteError("Unable to retrieve remote config on %s."
289
                                 " Fail reason: %s; output %s" %
290
                                 (data.cluster, result.fail_reason,
291
                                  result.output))
292

    
293
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
294
                                        data.cluster)
295
      utils.WriteFile(data.config_path, data=result.stdout)
296

    
297
  # R0201: Method could be a function
298
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
299
    """Kills the local master daemon.
300

    
301
    @raise errors.CommandError: If unable to kill
302

    
303
    """
304
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
305
    if result.failed:
306
      raise errors.CommandError("Unable to stop master daemons."
307
                                " Fail reason: %s; output: %s" %
308
                                (result.fail_reason, result.output))
309

    
310
  def _MergeConfig(self):
311
    """Merges all foreign config into our own config.
312

    
313
    """
314
    my_config = config.ConfigWriter(offline=True)
315
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
316

    
317
    for data in self.merger_data:
318
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
319
      self._MergeClusterConfigs(my_config, other_config)
320
      self._MergeNodeGroups(my_config, other_config)
321

    
322
      for node in other_config.GetNodeList():
323
        node_info = other_config.GetNodeInfo(node)
324
        # Offline the node, it will be reonlined later at node readd
325
        node_info.master_candidate = False
326
        node_info.drained = False
327
        node_info.offline = True
328
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
329
        fake_ec_id += 1
330

    
331
      for instance in other_config.GetInstanceList():
332
        instance_info = other_config.GetInstanceInfo(instance)
333

    
334
        # Update the DRBD port assignments
335
        # This is a little bit hackish
336
        for dsk in instance_info.disks:
337
          if dsk.dev_type in constants.LDS_DRBD:
338
            port = my_config.AllocatePort()
339

    
340
            logical_id = list(dsk.logical_id)
341
            logical_id[2] = port
342
            dsk.logical_id = tuple(logical_id)
343

    
344
            physical_id = list(dsk.physical_id)
345
            physical_id[1] = physical_id[3] = port
346
            dsk.physical_id = tuple(physical_id)
347

    
348
        my_config.AddInstance(instance_info,
349
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
350
        fake_ec_id += 1
351

    
352
  def _MergeClusterConfigs(self, my_config, other_config):
353
    """Checks that all relevant cluster parameters are compatible
354

    
355
    """
356
    my_cluster = my_config.GetClusterInfo()
357
    other_cluster = other_config.GetClusterInfo()
358
    err_count = 0
359

    
360
    #
361
    # Generic checks
362
    #
363
    check_params = [
364
      "beparams",
365
      "default_iallocator",
366
      "drbd_usermode_helper",
367
      "hidden_os",
368
      "maintain_node_health",
369
      "master_netdev",
370
      "ndparams",
371
      "nicparams",
372
      "primary_ip_family",
373
      "tags",
374
      "uid_pool",
375
      ]
376
    check_params_strict = [
377
      "volume_group_name",
378
    ]
379
    if constants.ENABLE_FILE_STORAGE:
380
      check_params_strict.append("file_storage_dir")
381
    if constants.ENABLE_SHARED_FILE_STORAGE:
382
      check_params_strict.append("shared_file_storage_dir")
383
    check_params.extend(check_params_strict)
384

    
385
    if self.params == _PARAMS_STRICT:
386
      params_strict = True
387
    else:
388
      params_strict = False
389

    
390
    for param_name in check_params:
391
      my_param = getattr(my_cluster, param_name)
392
      other_param = getattr(other_cluster, param_name)
393
      if my_param != other_param:
394
        logging.error("The value (%s) of the cluster parameter %s on %s"
395
                      " differs to this cluster's value (%s)",
396
                      other_param, param_name, other_cluster.cluster_name,
397
                      my_param)
398
        if params_strict or param_name in check_params_strict:
399
          err_count += 1
400

    
401
    #
402
    # Custom checks
403
    #
404

    
405
    # Check default hypervisor
406
    my_defhyp = my_cluster.enabled_hypervisors[0]
407
    other_defhyp = other_cluster.enabled_hypervisors[0]
408
    if my_defhyp != other_defhyp:
409
      logging.warning("The default hypervisor (%s) differs on %s, new"
410
                      " instances will be created with this cluster's"
411
                      " default hypervisor (%s)", other_defhyp,
412
                      other_cluster.cluster_name, my_defhyp)
413

    
414
    if (set(my_cluster.enabled_hypervisors) !=
415
        set(other_cluster.enabled_hypervisors)):
416
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
417
                    " this cluster's set (%s)",
418
                    other_cluster.enabled_hypervisors,
419
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
420
      err_count += 1
421

    
422
    # Check hypervisor params for hypervisors we care about
423
    for hyp in my_cluster.enabled_hypervisors:
424
      for param in my_cluster.hvparams[hyp]:
425
        my_value = my_cluster.hvparams[hyp][param]
426
        other_value = other_cluster.hvparams[hyp][param]
427
        if my_value != other_value:
428
          logging.error("The value (%s) of the %s parameter of the %s"
429
                        " hypervisor on %s differs to this cluster's parameter"
430
                        " (%s)",
431
                        other_value, param, hyp, other_cluster.cluster_name,
432
                        my_value)
433
          if params_strict:
434
            err_count += 1
435

    
436
    # Check os hypervisor params for hypervisors we care about
437
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
438
      for hyp in my_cluster.enabled_hypervisors:
439
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
440
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
441
        if my_os_hvp != other_os_hvp:
442
          logging.error("The OS parameters (%s) for the %s OS for the %s"
443
                        " hypervisor on %s differs to this cluster's parameters"
444
                        " (%s)",
445
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
446
                        my_os_hvp)
447
          if params_strict:
448
            err_count += 1
449

    
450
    #
451
    # Warnings
452
    #
453
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
454
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
455
                      " this cluster's value (%s) will take precedence",
456
                      other_cluster.modify_etc_hosts,
457
                      other_cluster.cluster_name,
458
                      my_cluster.modify_etc_hosts)
459

    
460
    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
461
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
462
                      " this cluster's value (%s) will take precedence",
463
                      other_cluster.modify_ssh_setup,
464
                      other_cluster.cluster_name,
465
                      my_cluster.modify_ssh_setup)
466

    
467
    #
468
    # Actual merging
469
    #
470
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
471
                                       other_cluster.reserved_lvs))
472

    
473
    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
474
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
475
                      " cluster's value (%s). The least permissive value (%s)"
476
                      " will be used", other_cluster.prealloc_wipe_disks,
477
                      other_cluster.cluster_name,
478
                      my_cluster.prealloc_wipe_disks, True)
479
      my_cluster.prealloc_wipe_disks = True
480

    
481
    for os_, osparams in other_cluster.osparams.items():
482
      if os_ not in my_cluster.osparams:
483
        my_cluster.osparams[os_] = osparams
484
      elif my_cluster.osparams[os_] != osparams:
485
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
486
                      " this cluster's parameters (%s)",
487
                      osparams, os_, other_cluster.cluster_name,
488
                      my_cluster.osparams[os_])
489
        if params_strict:
490
          err_count += 1
491

    
492
    if err_count:
493
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
494
                                      " values, please fix and re-run" %
495
                                      other_cluster.cluster_name)
496

    
497
  # R0201: Method could be a function
498
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
499
    if os_name in cluster.os_hvp:
500
      return cluster.os_hvp[os_name].get(hyp, None)
501
    else:
502
      return None
503

    
504
  # R0201: Method could be a function
505
  def _MergeNodeGroups(self, my_config, other_config):
506
    """Adds foreign node groups
507

    
508
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
509
    """
510
    # pylint: disable-msg=R0201
511
    logging.info("Node group conflict strategy: %s", self.groups)
512

    
513
    my_grps = my_config.GetAllNodeGroupsInfo().values()
514
    other_grps = other_config.GetAllNodeGroupsInfo().values()
515

    
516
    # Check for node group naming conflicts:
517
    conflicts = []
518
    for other_grp in other_grps:
519
      for my_grp in my_grps:
520
        if other_grp.name == my_grp.name:
521
          conflicts.append(other_grp)
522

    
523
    if conflicts:
524
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
525
      logging.info("Node groups in both local and remote cluster: %s",
526
                   conflict_names)
527

    
528
      # User hasn't specified how to handle conflicts
529
      if not self.groups:
530
        raise errors.CommandError("The following node group(s) are in both"
531
                                  " clusters, and no merge strategy has been"
532
                                  " supplied (see the --groups option): %s" %
533
                                  conflict_names)
534

    
535
      # User wants to rename conflicts
536
      elif self.groups == _GROUPS_RENAME:
537
        for grp in conflicts:
538
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
539
          logging.info("Renaming remote node group from %s to %s"
540
                       " to resolve conflict", grp.name, new_name)
541
          grp.name = new_name
542

    
543
      # User wants to merge conflicting groups
544
      elif self.groups == _GROUPS_MERGE:
545
        for other_grp in conflicts:
546
          logging.info("Merging local and remote '%s' groups", other_grp.name)
547
          for node_name in other_grp.members[:]:
548
            node = other_config.GetNodeInfo(node_name)
549
            # Access to a protected member of a client class
550
            # pylint: disable-msg=W0212
551
            other_config._UnlockedRemoveNodeFromGroup(node)
552

    
553
            # Access to a protected member of a client class
554
            # pylint: disable-msg=W0212
555
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
556

    
557
            # Access to a protected member of a client class
558
            # pylint: disable-msg=W0212
559
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
560
            node.group = my_grp_uuid
561
          # Remove from list of groups to add
562
          other_grps.remove(other_grp)
563

    
564
    for grp in other_grps:
565
      #TODO: handle node group conflicts
566
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
567

    
568
  # R0201: Method could be a function
569
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
570
    """Starts the local master daemon.
571

    
572
    @param no_vote: Should the masterd started without voting? default: False
573
    @raise errors.CommandError: If unable to start daemon.
574

    
575
    """
576
    env = {}
577
    if no_vote:
578
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
579

    
580
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
581
    if result.failed:
582
      raise errors.CommandError("Couldn't start ganeti master."
583
                                " Fail reason: %s; output: %s" %
584
                                (result.fail_reason, result.output))
585

    
586
  def _ReaddMergedNodesAndRedist(self):
587
    """Readds all merging nodes and make sure their config is up-to-date.
588

    
589
    @raise errors.CommandError: If anything fails.
590

    
591
    """
592
    for data in self.merger_data:
593
      for node in data.nodes:
594
        result = utils.RunCmd(["gnt-node", "add", "--readd",
595
                               "--no-ssh-key-check", "--force-join", node])
596
        if result.failed:
597
          logging.error("%s failed to be readded. Reason: %s, output: %s",
598
                         node, result.fail_reason, result.output)
599

    
600
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
601
    if result.failed:
602
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
603
                                " output: %s" % (result.fail_reason,
604
                                                result.output))
605

    
606
  # R0201: Method could be a function
607
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
608
    """Starts up all instances (locally).
609

    
610
    @raise errors.CommandError: If unable to start clusters
611

    
612
    """
613
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
614
                           "--force-multiple"])
615
    if result.failed:
616
      raise errors.CommandError("Unable to start all instances."
617
                                " Fail reason: %s; output: %s" %
618
                                (result.fail_reason, result.output))
619

    
620
  # R0201: Method could be a function
621
  # TODO: make this overridable, for some verify errors
622
  def _VerifyCluster(self): # pylint: disable-msg=R0201
623
    """Runs gnt-cluster verify to verify the health.
624

    
625
    @raise errors.ProgrammError: If cluster fails on verification
626

    
627
    """
628
    result = utils.RunCmd(["gnt-cluster", "verify"])
629
    if result.failed:
630
      raise errors.CommandError("Verification of cluster failed."
631
                                " Fail reason: %s; output: %s" %
632
                                (result.fail_reason, result.output))
633

    
634
  def Merge(self):
635
    """Does the actual merge.
636

    
637
    It runs all the steps in the right order and updates the user about steps
638
    taken. Also it keeps track of rollback_steps to undo everything.
639

    
640
    """
641
    rbsteps = []
642
    try:
643
      logging.info("Pre cluster verification")
644
      self._VerifyCluster()
645

    
646
      logging.info("Prepare authorized_keys")
647
      rbsteps.append("Remove our key from authorized_keys on nodes:"
648
                     " %(nodes)s")
649
      self._PrepareAuthorizedKeys()
650

    
651
      rbsteps.append("Start all instances again on the merging"
652
                     " clusters: %(clusters)s")
653
      logging.info("Stopping merging instances (takes a while)")
654
      self._StopMergingInstances()
655

    
656
      logging.info("Disable watcher")
657
      self._DisableWatcher()
658
      logging.info("Stop daemons on merging nodes")
659
      self._StopDaemons()
660
      logging.info("Merging config")
661
      self._FetchRemoteConfig()
662

    
663
      logging.info("Stopping master daemon")
664
      self._KillMasterDaemon()
665

    
666
      rbsteps.append("Restore %s from another master candidate"
667
                     " and restart master daemon" %
668
                     constants.CLUSTER_CONF_FILE)
669
      self._MergeConfig()
670
      self._StartMasterDaemon(no_vote=True)
671

    
672
      # Point of no return, delete rbsteps
673
      del rbsteps[:]
674

    
675
      logging.warning("We are at the point of no return. Merge can not easily"
676
                      " be undone after this point.")
677
      logging.info("Readd nodes")
678
      self._ReaddMergedNodesAndRedist()
679

    
680
      logging.info("Merge done, restart master daemon normally")
681
      self._KillMasterDaemon()
682
      self._StartMasterDaemon()
683

    
684
      if self.restart == _RESTART_ALL:
685
        logging.info("Starting instances again")
686
        self._StartupAllInstances()
687
      else:
688
        logging.info("Not starting instances again")
689
      logging.info("Post cluster verification")
690
      self._VerifyCluster()
691
    except errors.GenericError, e:
692
      logging.exception(e)
693

    
694
      if rbsteps:
695
        nodes = Flatten([data.nodes for data in self.merger_data])
696
        info = {
697
          "clusters": self.clusters,
698
          "nodes": nodes,
699
          }
700
        logging.critical("In order to rollback do the following:")
701
        for step in rbsteps:
702
          logging.critical("  * %s", step % info)
703
      else:
704
        logging.critical("Nothing to rollback.")
705

    
706
      # TODO: Keep track of steps done for a flawless resume?
707

    
708
  def Cleanup(self):
709
    """Clean up our environment.
710

    
711
    This cleans up remote private keys and configs and after that
712
    deletes the temporary directory.
713

    
714
    """
715
    shutil.rmtree(self.work_dir)
716

    
717

    
718
def SetupLogging(options):
719
  """Setting up logging infrastructure.
720

    
721
  @param options: Parsed command line options
722

    
723
  """
724
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
725

    
726
  stderr_handler = logging.StreamHandler()
727
  stderr_handler.setFormatter(formatter)
728
  if options.debug:
729
    stderr_handler.setLevel(logging.NOTSET)
730
  elif options.verbose:
731
    stderr_handler.setLevel(logging.INFO)
732
  else:
733
    stderr_handler.setLevel(logging.WARNING)
734

    
735
  root_logger = logging.getLogger("")
736
  root_logger.setLevel(logging.NOTSET)
737
  root_logger.addHandler(stderr_handler)
738

    
739

    
740
def main():
741
  """Main routine.
742

    
743
  """
744
  program = os.path.basename(sys.argv[0])
745

    
746
  parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
747
                                 prog=program)
748
  parser.add_option(cli.DEBUG_OPT)
749
  parser.add_option(cli.VERBOSE_OPT)
750
  parser.add_option(PAUSE_PERIOD_OPT)
751
  parser.add_option(GROUPS_OPT)
752
  parser.add_option(RESTART_OPT)
753
  parser.add_option(PARAMS_OPT)
754

    
755
  (options, args) = parser.parse_args()
756

    
757
  SetupLogging(options)
758

    
759
  if not args:
760
    parser.error("No clusters specified")
761

    
762
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
763
                          options.groups, options.restart, options.params)
764
  try:
765
    try:
766
      cluster_merger.Setup()
767
      cluster_merger.Merge()
768
    except errors.GenericError, e:
769
      logging.exception(e)
770
      return constants.EXIT_FAILURE
771
  finally:
772
    cluster_merger.Cleanup()
773

    
774
  return constants.EXIT_SUCCESS
775

    
776

    
777
if __name__ == "__main__":
778
  sys.exit(main())