Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ 620a9c62

History | View | Annotate | Download (27 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
_GROUPS_MERGE = "merge"
46
_GROUPS_RENAME = "rename"
47
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48
_RESTART_ALL = "all"
49
_RESTART_UP = "up"
50
_RESTART_NONE = "none"
51
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52

    
53

    
54
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
55
                                  action="store", type="int",
56
                                  dest="pause_period",
57
                                  help=("Amount of time in seconds watcher"
58
                                        " should be suspended from running"))
59
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
60
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
61
                            dest="groups",
62
                            help=("How to handle groups that have the"
63
                                  " same name (One of: %s/%s)" %
64
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
65
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
66
                             metavar="STRATEGY",
67
                             choices=_RESTART_CHOICES,
68
                             dest="restart",
69
                             help=("How to handle restarting instances"
70
                                   " same name (One of: %s/%s/%s)" %
71
                                   _RESTART_CHOICES))
72

    
73

    
74
def Flatten(unflattened_list):
75
  """Flattens a list.
76

    
77
  @param unflattened_list: A list of unflattened list objects.
78
  @return: A flattened list
79

    
80
  """
81
  flattened_list = []
82

    
83
  for item in unflattened_list:
84
    if isinstance(item, list):
85
      flattened_list.extend(Flatten(item))
86
    else:
87
      flattened_list.append(item)
88
  return flattened_list
89

    
90

    
91
class MergerData(object):
92
  """Container class to hold data used for merger.
93

    
94
  """
95
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
96
    """Initialize the container.
97

    
98
    @param cluster: The name of the cluster
99
    @param key_path: Path to the ssh private key used for authentication
100
    @param nodes: List of online nodes in the merging cluster
101
    @param instances: List of instances running on merging cluster
102
    @param config_path: Path to the merging cluster config
103

    
104
    """
105
    self.cluster = cluster
106
    self.key_path = key_path
107
    self.nodes = nodes
108
    self.instances = instances
109
    self.config_path = config_path
110

    
111

    
112
class Merger(object):
113
  """Handling the merge.
114

    
115
  """
116
  def __init__(self, clusters, pause_period, groups, restart):
117
    """Initialize object with sane defaults and infos required.
118

    
119
    @param clusters: The list of clusters to merge in
120
    @param pause_period: The time watcher shall be disabled for
121
    @param groups: How to handle group conflicts
122
    @param restart: How to handle instance restart
123

    
124
    """
125
    self.merger_data = []
126
    self.clusters = clusters
127
    self.pause_period = pause_period
128
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
129
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
130
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
131
    self.groups = groups
132
    self.restart = restart
133
    if self.restart == _RESTART_UP:
134
      raise NotImplementedError
135

    
136

    
137
  def Setup(self):
138
    """Sets up our end so we can do the merger.
139

    
140
    This method is setting us up as a preparation for the merger.
141
    It makes the initial contact and gathers information needed.
142

    
143
    @raise errors.RemoteError: for errors in communication/grabbing
144

    
145
    """
146
    (remote_path, _, _) = ssh.GetUserFiles("root")
147

    
148
    if self.cluster_name in self.clusters:
149
      raise errors.CommandError("Cannot merge cluster %s with itself" %
150
                                self.cluster_name)
151

    
152
    # Fetch remotes private key
153
    for cluster in self.clusters:
154
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
155
                            ask_key=False)
156
      if result.failed:
157
        raise errors.RemoteError("There was an error while grabbing ssh private"
158
                                 " key from %s. Fail reason: %s; output: %s" %
159
                                 (cluster, result.fail_reason, result.output))
160

    
161
      key_path = utils.PathJoin(self.work_dir, cluster)
162
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
163

    
164
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
165
                            " --no-header --separator=,", private_key=key_path)
166
      if result.failed:
167
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
168
                                 " Fail reason: %s; output: %s" %
169
                                 (cluster, result.fail_reason, result.output))
170
      nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
171
      nodes = [node_status[0] for node_status in nodes_statuses
172
               if node_status[1] == "N"]
173

    
174
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
175
                            private_key=key_path)
176
      if result.failed:
177
        raise errors.RemoteError("Unable to retrieve list of instances from"
178
                                 " %s. Fail reason: %s; output: %s" %
179
                                 (cluster, result.fail_reason, result.output))
180
      instances = result.stdout.splitlines()
181

    
182
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
183

    
184
  def _PrepareAuthorizedKeys(self):
185
    """Prepare the authorized_keys on every merging node.
186

    
187
    This method add our public key to remotes authorized_key for further
188
    communication.
189

    
190
    """
191
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
192
    pub_key = utils.ReadFile(pub_key_file)
193

    
194
    for data in self.merger_data:
195
      for node in data.nodes:
196
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
197
                                     (auth_keys, pub_key)),
198
                              private_key=data.key_path)
199

    
200
        if result.failed:
201
          raise errors.RemoteError("Unable to add our public key to %s in %s."
202
                                   " Fail reason: %s; output: %s" %
203
                                   (node, data.cluster, result.fail_reason,
204
                                    result.output))
205

    
206
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
207
              strict_host_check=False, private_key=None, batch=True,
208
              ask_key=False):
209
    """Wrapping SshRunner.Run with default parameters.
210

    
211
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
212

    
213
    """
214
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
215
                               use_cluster_key=use_cluster_key,
216
                               strict_host_check=strict_host_check,
217
                               private_key=private_key, batch=batch,
218
                               ask_key=ask_key)
219

    
220
  def _StopMergingInstances(self):
221
    """Stop instances on merging clusters.
222

    
223
    """
224
    for cluster in self.clusters:
225
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
226
                                     " --force-multiple")
227

    
228
      if result.failed:
229
        raise errors.RemoteError("Unable to stop instances on %s."
230
                                 " Fail reason: %s; output: %s" %
231
                                 (cluster, result.fail_reason, result.output))
232

    
233
  def _DisableWatcher(self):
234
    """Disable watch on all merging clusters, including ourself.
235

    
236
    """
237
    for cluster in ["localhost"] + self.clusters:
238
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
239
                                     self.pause_period)
240

    
241
      if result.failed:
242
        raise errors.RemoteError("Unable to pause watcher on %s."
243
                                 " Fail reason: %s; output: %s" %
244
                                 (cluster, result.fail_reason, result.output))
245

    
246
  def _StopDaemons(self):
247
    """Stop all daemons on merging nodes.
248

    
249
    """
250
    cmd = "%s stop-all" % constants.DAEMON_UTIL
251
    for data in self.merger_data:
252
      for node in data.nodes:
253
        result = self._RunCmd(node, cmd)
254

    
255
        if result.failed:
256
          raise errors.RemoteError("Unable to stop daemons on %s."
257
                                   " Fail reason: %s; output: %s." %
258
                                   (node, result.fail_reason, result.output))
259

    
260
  def _FetchRemoteConfig(self):
261
    """Fetches and stores remote cluster config from the master.
262

    
263
    This step is needed before we can merge the config.
264

    
265
    """
266
    for data in self.merger_data:
267
      result = self._RunCmd(data.cluster, "cat %s" %
268
                                          constants.CLUSTER_CONF_FILE)
269

    
270
      if result.failed:
271
        raise errors.RemoteError("Unable to retrieve remote config on %s."
272
                                 " Fail reason: %s; output %s" %
273
                                 (data.cluster, result.fail_reason,
274
                                  result.output))
275

    
276
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
277
                                        data.cluster)
278
      utils.WriteFile(data.config_path, data=result.stdout)
279

    
280
  # R0201: Method could be a function
281
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
282
    """Kills the local master daemon.
283

    
284
    @raise errors.CommandError: If unable to kill
285

    
286
    """
287
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
288
    if result.failed:
289
      raise errors.CommandError("Unable to stop master daemons."
290
                                " Fail reason: %s; output: %s" %
291
                                (result.fail_reason, result.output))
292

    
293
  def _MergeConfig(self):
294
    """Merges all foreign config into our own config.
295

    
296
    """
297
    my_config = config.ConfigWriter(offline=True)
298
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
299

    
300
    for data in self.merger_data:
301
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
302
      self._MergeClusterConfigs(my_config, other_config)
303
      self._MergeNodeGroups(my_config, other_config)
304

    
305
      for node in other_config.GetNodeList():
306
        node_info = other_config.GetNodeInfo(node)
307
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
308
        fake_ec_id += 1
309

    
310
      for instance in other_config.GetInstanceList():
311
        instance_info = other_config.GetInstanceInfo(instance)
312

    
313
        # Update the DRBD port assignments
314
        # This is a little bit hackish
315
        for dsk in instance_info.disks:
316
          if dsk.dev_type in constants.LDS_DRBD:
317
            port = my_config.AllocatePort()
318

    
319
            logical_id = list(dsk.logical_id)
320
            logical_id[2] = port
321
            dsk.logical_id = tuple(logical_id)
322

    
323
            physical_id = list(dsk.physical_id)
324
            physical_id[1] = physical_id[3] = port
325
            dsk.physical_id = tuple(physical_id)
326

    
327
        my_config.AddInstance(instance_info,
328
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
329
        fake_ec_id += 1
330

    
331
  # R0201: Method could be a function
332
  def _MergeClusterConfigs(self, my_config, other_config):
333
    """Checks that all relevant cluster parameters are compatible
334

    
335
    """
336
    # pylint: disable-msg=R0201
337
    my_cluster = my_config.GetClusterInfo()
338
    other_cluster = other_config.GetClusterInfo()
339
    err_count = 0
340

    
341
    #
342
    # Generic checks
343
    #
344
    check_params = (
345
      "beparams",
346
      "default_iallocator",
347
      "drbd_usermode_helper",
348
      "file_storage_dir",
349
      "hidden_os",
350
      "maintain_node_health",
351
      "master_netdev",
352
      "ndparams",
353
      "nicparams",
354
      "primary_ip_family",
355
      "tags",
356
      "uid_pool",
357
      "volume_group_name",
358
      )
359
    for param_name in check_params:
360
      my_param = getattr(my_cluster, param_name)
361
      other_param = getattr(other_cluster, param_name)
362
      if my_param != other_param:
363
        logging.error("The value (%s) of the cluster parameter %s on %s"
364
                      " differs to this cluster's value (%s)",
365
                      other_param, param_name, other_cluster.cluster_name,
366
                      my_param)
367
        err_count += 1
368

    
369
    #
370
    # Custom checks
371
    #
372

    
373
    # Check default hypervisor
374
    my_defhyp = my_cluster.enabled_hypervisors[0]
375
    other_defhyp = other_cluster.enabled_hypervisors[0]
376
    if my_defhyp != other_defhyp:
377
      logging.warning("The default hypervisor (%s) differs on %s, new"
378
                      " instances will be created with this cluster's"
379
                      " default hypervisor (%s)", other_defhyp,
380
                      other_cluster.cluster_name, my_defhyp)
381

    
382
    if (set(my_cluster.enabled_hypervisors) !=
383
        set(other_cluster.enabled_hypervisors)):
384
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
385
                    " this cluster's set (%s)",
386
                    other_cluster.enabled_hypervisors,
387
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
388
      err_count += 1
389

    
390
    # Check hypervisor params for hypervisors we care about
391
    # TODO: we probably don't care about all params for a given hypervisor
392
    for hyp in my_cluster.enabled_hypervisors:
393
      for param in my_cluster.hvparams[hyp]:
394
        my_value = my_cluster.hvparams[hyp][param]
395
        other_value = other_cluster.hvparams[hyp][param]
396
        if my_value != other_value:
397
          logging.error("The value (%s) of the %s parameter of the %s"
398
                        " hypervisor on %s differs to this cluster's parameter"
399
                        " (%s)",
400
                        other_value, param, hyp, other_cluster.cluster_name,
401
                        my_value)
402
          err_count += 1
403

    
404
    # Check os hypervisor params for hypervisors we care about
405
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
406
      for hyp in my_cluster.enabled_hypervisors:
407
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
408
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
409
        if my_os_hvp != other_os_hvp:
410
          logging.error("The OS parameters (%s) for the %s OS for the %s"
411
                        " hypervisor on %s differs to this cluster's parameters"
412
                        " (%s)",
413
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
414
                        my_os_hvp)
415
          err_count += 1
416

    
417
    #
418
    # Warnings
419
    #
420
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
421
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
422
                      " this cluster's value (%s) will take precedence",
423
                      other_cluster.modify_etc_hosts,
424
                      other_cluster.cluster_name,
425
                      my_cluster.modify_etc_hosts)
426

    
427
    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
428
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
429
                      " this cluster's value (%s) will take precedence",
430
                      other_cluster.modify_ssh_setup,
431
                      other_cluster.cluster_name,
432
                      my_cluster.modify_ssh_setup)
433

    
434
    #
435
    # Actual merging
436
    #
437
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
438
                                       other_cluster.reserved_lvs))
439

    
440
    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
441
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
442
                      " cluster's value (%s). The least permissive value (%s)"
443
                      " will be used", other_cluster.prealloc_wipe_disks,
444
                      other_cluster.cluster_name,
445
                      my_cluster.prealloc_wipe_disks, True)
446
      my_cluster.prealloc_wipe_disks = True
447

    
448
    for os_, osparams in other_cluster.osparams.items():
449
      if os_ not in my_cluster.osparams:
450
        my_cluster.osparams[os_] = osparams
451
      elif my_cluster.osparams[os_] != osparams:
452
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
453
                      " this cluster's parameters (%s)",
454
                      osparams, os_, other_cluster.cluster_name,
455
                      my_cluster.osparams[os_])
456
        err_count += 1
457

    
458
    if err_count:
459
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
460
                                      " values, please fix and re-run" %
461
                                      other_cluster.cluster_name)
462

    
463
  # R0201: Method could be a function
464
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
465
    if os_name in cluster.os_hvp:
466
      return cluster.os_hvp[os_name].get(hyp, None)
467
    else:
468
      return None
469

    
470
  # R0201: Method could be a function
471
  def _MergeNodeGroups(self, my_config, other_config):
472
    """Adds foreign node groups
473

    
474
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
475
    """
476
    # pylint: disable-msg=R0201
477
    logging.info("Node group conflict strategy: %s", self.groups)
478

    
479
    my_grps = my_config.GetAllNodeGroupsInfo().values()
480
    other_grps = other_config.GetAllNodeGroupsInfo().values()
481

    
482
    # Check for node group naming conflicts:
483
    conflicts = []
484
    for other_grp in other_grps:
485
      for my_grp in my_grps:
486
        if other_grp.name == my_grp.name:
487
          conflicts.append(other_grp)
488

    
489
    if conflicts:
490
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
491
      logging.info("Node groups in both local and remote cluster: %s",
492
                   conflict_names)
493

    
494
      # User hasn't specified how to handle conflicts
495
      if not self.groups:
496
        raise errors.CommandError("The following node group(s) are in both"
497
                                  " clusters, and no merge strategy has been"
498
                                  " supplied (see the --groups option): %s" %
499
                                  conflict_names)
500

    
501
      # User wants to rename conflicts
502
      elif self.groups == _GROUPS_RENAME:
503
        for grp in conflicts:
504
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
505
          logging.info("Renaming remote node group from %s to %s"
506
                       " to resolve conflict", grp.name, new_name)
507
          grp.name = new_name
508

    
509
      # User wants to merge conflicting groups
510
      elif self.groups == 'merge':
511
        for other_grp in conflicts:
512
          logging.info("Merging local and remote '%s' groups", other_grp.name)
513
          for node_name in other_grp.members[:]:
514
            node = other_config.GetNodeInfo(node_name)
515
            # Access to a protected member of a client class
516
            # pylint: disable-msg=W0212
517
            other_config._UnlockedRemoveNodeFromGroup(node)
518

    
519
            # Access to a protected member of a client class
520
            # pylint: disable-msg=W0212
521
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
522

    
523
            # Access to a protected member of a client class
524
            # pylint: disable-msg=W0212
525
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
526
            node.group = my_grp_uuid
527
          # Remove from list of groups to add
528
          other_grps.remove(other_grp)
529

    
530
    for grp in other_grps:
531
      #TODO: handle node group conflicts
532
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
533

    
534
  # R0201: Method could be a function
535
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
536
    """Starts the local master daemon.
537

    
538
    @param no_vote: Should the masterd started without voting? default: False
539
    @raise errors.CommandError: If unable to start daemon.
540

    
541
    """
542
    env = {}
543
    if no_vote:
544
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
545

    
546
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
547
    if result.failed:
548
      raise errors.CommandError("Couldn't start ganeti master."
549
                                " Fail reason: %s; output: %s" %
550
                                (result.fail_reason, result.output))
551

    
552
  def _ReaddMergedNodesAndRedist(self):
553
    """Readds all merging nodes and make sure their config is up-to-date.
554

    
555
    @raise errors.CommandError: If anything fails.
556

    
557
    """
558
    for data in self.merger_data:
559
      for node in data.nodes:
560
        result = utils.RunCmd(["gnt-node", "add", "--readd",
561
                               "--no-ssh-key-check", "--force-join", node])
562
        if result.failed:
563
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
564
                                    " output: %s" % (node, result.fail_reason,
565
                                                     result.output))
566

    
567
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
568
    if result.failed:
569
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
570
                                " output: %s" % (result.fail_reason,
571
                                                result.output))
572

    
573
  # R0201: Method could be a function
574
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
575
    """Starts up all instances (locally).
576

    
577
    @raise errors.CommandError: If unable to start clusters
578

    
579
    """
580
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
581
                           "--force-multiple"])
582
    if result.failed:
583
      raise errors.CommandError("Unable to start all instances."
584
                                " Fail reason: %s; output: %s" %
585
                                (result.fail_reason, result.output))
586

    
587
  # R0201: Method could be a function
588
  def _VerifyCluster(self): # pylint: disable-msg=R0201
589
    """Runs gnt-cluster verify to verify the health.
590

    
591
    @raise errors.ProgrammError: If cluster fails on verification
592

    
593
    """
594
    result = utils.RunCmd(["gnt-cluster", "verify"])
595
    if result.failed:
596
      raise errors.CommandError("Verification of cluster failed."
597
                                " Fail reason: %s; output: %s" %
598
                                (result.fail_reason, result.output))
599

    
600
  def Merge(self):
601
    """Does the actual merge.
602

    
603
    It runs all the steps in the right order and updates the user about steps
604
    taken. Also it keeps track of rollback_steps to undo everything.
605

    
606
    """
607
    rbsteps = []
608
    try:
609
      logging.info("Pre cluster verification")
610
      self._VerifyCluster()
611

    
612
      logging.info("Prepare authorized_keys")
613
      rbsteps.append("Remove our key from authorized_keys on nodes:"
614
                     " %(nodes)s")
615
      self._PrepareAuthorizedKeys()
616

    
617
      rbsteps.append("Start all instances again on the merging"
618
                     " clusters: %(clusters)s")
619
      logging.info("Stopping merging instances (takes a while)")
620
      self._StopMergingInstances()
621

    
622
      logging.info("Disable watcher")
623
      self._DisableWatcher()
624
      logging.info("Stop daemons on merging nodes")
625
      self._StopDaemons()
626
      logging.info("Merging config")
627
      self._FetchRemoteConfig()
628

    
629
      logging.info("Stopping master daemon")
630
      self._KillMasterDaemon()
631

    
632
      rbsteps.append("Restore %s from another master candidate"
633
                     " and restart master daemon" %
634
                     constants.CLUSTER_CONF_FILE)
635
      self._MergeConfig()
636
      self._StartMasterDaemon(no_vote=True)
637

    
638
      # Point of no return, delete rbsteps
639
      del rbsteps[:]
640

    
641
      logging.warning("We are at the point of no return. Merge can not easily"
642
                      " be undone after this point.")
643
      logging.info("Readd nodes")
644
      self._ReaddMergedNodesAndRedist()
645

    
646
      logging.info("Merge done, restart master daemon normally")
647
      self._KillMasterDaemon()
648
      self._StartMasterDaemon()
649

    
650
      if self.restart == _RESTART_ALL:
651
        logging.info("Starting instances again")
652
        self._StartupAllInstances()
653
      else:
654
        logging.info("Not starting instances again")
655
      logging.info("Post cluster verification")
656
      self._VerifyCluster()
657
    except errors.GenericError, e:
658
      logging.exception(e)
659

    
660
      if rbsteps:
661
        nodes = Flatten([data.nodes for data in self.merger_data])
662
        info = {
663
          "clusters": self.clusters,
664
          "nodes": nodes,
665
          }
666
        logging.critical("In order to rollback do the following:")
667
        for step in rbsteps:
668
          logging.critical("  * %s", step % info)
669
      else:
670
        logging.critical("Nothing to rollback.")
671

    
672
      # TODO: Keep track of steps done for a flawless resume?
673

    
674
  def Cleanup(self):
675
    """Clean up our environment.
676

    
677
    This cleans up remote private keys and configs and after that
678
    deletes the temporary directory.
679

    
680
    """
681
    shutil.rmtree(self.work_dir)
682

    
683

    
684
def SetupLogging(options):
685
  """Setting up logging infrastructure.
686

    
687
  @param options: Parsed command line options
688

    
689
  """
690
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
691

    
692
  stderr_handler = logging.StreamHandler()
693
  stderr_handler.setFormatter(formatter)
694
  if options.debug:
695
    stderr_handler.setLevel(logging.NOTSET)
696
  elif options.verbose:
697
    stderr_handler.setLevel(logging.INFO)
698
  else:
699
    stderr_handler.setLevel(logging.WARNING)
700

    
701
  root_logger = logging.getLogger("")
702
  root_logger.setLevel(logging.NOTSET)
703
  root_logger.addHandler(stderr_handler)
704

    
705

    
706
def main():
707
  """Main routine.
708

    
709
  """
710
  program = os.path.basename(sys.argv[0])
711

    
712
  parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
713
                                        " [--watcher-pause-period SECONDS]"
714
                                        " [--groups [%s|%s]]"
715
                                        " [--restart [%s|%s|%s]]"
716
                                        " <cluster> [<cluster...>]" %
717
                                        (_GROUPS_MERGE, _GROUPS_RENAME,
718
                                         _RESTART_ALL, _RESTART_UP,
719
                                         _RESTART_NONE)),
720
                                        prog=program)
721
  parser.add_option(cli.DEBUG_OPT)
722
  parser.add_option(cli.VERBOSE_OPT)
723
  parser.add_option(PAUSE_PERIOD_OPT)
724
  parser.add_option(GROUPS_OPT)
725
  parser.add_option(RESTART_OPT)
726

    
727
  (options, args) = parser.parse_args()
728

    
729
  SetupLogging(options)
730

    
731
  if not args:
732
    parser.error("No clusters specified")
733

    
734
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
735
                          options.groups, options.restart)
736
  try:
737
    try:
738
      cluster_merger.Setup()
739
      cluster_merger.Merge()
740
    except errors.GenericError, e:
741
      logging.exception(e)
742
      return constants.EXIT_FAILURE
743
  finally:
744
    cluster_merger.Cleanup()
745

    
746
  return constants.EXIT_SUCCESS
747

    
748

    
749
if __name__ == "__main__":
750
  sys.exit(main())