Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ 638ac34b

History | View | Annotate | Download (26 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
_GROUPS_MERGE = "merge"
46
_GROUPS_RENAME = "rename"
47
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48

    
49
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50
                                  action="store", type="int",
51
                                  dest="pause_period",
52
                                  help=("Amount of time in seconds watcher"
53
                                        " should be suspended from running"))
54
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
56
                            dest="groups",
57
                            help=("How to handle groups that have the"
58
                                  " same name (One of: %s/%s)" %
59
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
60

    
61

    
62
def Flatten(unflattened_list):
63
  """Flattens a list.
64

    
65
  @param unflattened_list: A list of unflattened list objects.
66
  @return: A flattened list
67

    
68
  """
69
  flattened_list = []
70

    
71
  for item in unflattened_list:
72
    if isinstance(item, list):
73
      flattened_list.extend(Flatten(item))
74
    else:
75
      flattened_list.append(item)
76
  return flattened_list
77

    
78

    
79
class MergerData(object):
80
  """Container class to hold data used for merger.
81

    
82
  """
83
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
84
    """Initialize the container.
85

    
86
    @param cluster: The name of the cluster
87
    @param key_path: Path to the ssh private key used for authentication
88
    @param nodes: List of online nodes in the merging cluster
89
    @param instances: List of instances running on merging cluster
90
    @param config_path: Path to the merging cluster config
91

    
92
    """
93
    self.cluster = cluster
94
    self.key_path = key_path
95
    self.nodes = nodes
96
    self.instances = instances
97
    self.config_path = config_path
98

    
99

    
100
class Merger(object):
101
  """Handling the merge.
102

    
103
  """
104
  def __init__(self, clusters, pause_period, groups):
105
    """Initialize object with sane defaults and infos required.
106

    
107
    @param clusters: The list of clusters to merge in
108
    @param pause_period: The time watcher shall be disabled for
109
    @param groups: How to handle group conflicts
110

    
111
    """
112
    self.merger_data = []
113
    self.clusters = clusters
114
    self.pause_period = pause_period
115
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
116
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
117
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
118
    self.groups = groups
119

    
120
  def Setup(self):
121
    """Sets up our end so we can do the merger.
122

    
123
    This method is setting us up as a preparation for the merger.
124
    It makes the initial contact and gathers information needed.
125

    
126
    @raise errors.RemoteError: for errors in communication/grabbing
127

    
128
    """
129
    (remote_path, _, _) = ssh.GetUserFiles("root")
130

    
131
    if self.cluster_name in self.clusters:
132
      raise errors.CommandError("Cannot merge cluster %s with itself" %
133
                                self.cluster_name)
134

    
135
    # Fetch remotes private key
136
    for cluster in self.clusters:
137
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
138
                            ask_key=False)
139
      if result.failed:
140
        raise errors.RemoteError("There was an error while grabbing ssh private"
141
                                 " key from %s. Fail reason: %s; output: %s" %
142
                                 (cluster, result.fail_reason, result.output))
143

    
144
      key_path = utils.PathJoin(self.work_dir, cluster)
145
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
146

    
147
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
148
                            " --no-header --separator=,", private_key=key_path)
149
      if result.failed:
150
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
151
                                 " Fail reason: %s; output: %s" %
152
                                 (cluster, result.fail_reason, result.output))
153
      nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
154
      nodes = [node_status[0] for node_status in nodes_statuses
155
               if node_status[1] == "N"]
156

    
157
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
158
                            private_key=key_path)
159
      if result.failed:
160
        raise errors.RemoteError("Unable to retrieve list of instances from"
161
                                 " %s. Fail reason: %s; output: %s" %
162
                                 (cluster, result.fail_reason, result.output))
163
      instances = result.stdout.splitlines()
164

    
165
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
166

    
167
  def _PrepareAuthorizedKeys(self):
168
    """Prepare the authorized_keys on every merging node.
169

    
170
    This method add our public key to remotes authorized_key for further
171
    communication.
172

    
173
    """
174
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
175
    pub_key = utils.ReadFile(pub_key_file)
176

    
177
    for data in self.merger_data:
178
      for node in data.nodes:
179
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
180
                                     (auth_keys, pub_key)),
181
                              private_key=data.key_path)
182

    
183
        if result.failed:
184
          raise errors.RemoteError("Unable to add our public key to %s in %s."
185
                                   " Fail reason: %s; output: %s" %
186
                                   (node, data.cluster, result.fail_reason,
187
                                    result.output))
188

    
189
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
190
              strict_host_check=False, private_key=None, batch=True,
191
              ask_key=False):
192
    """Wrapping SshRunner.Run with default parameters.
193

    
194
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
195

    
196
    """
197
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
198
                               use_cluster_key=use_cluster_key,
199
                               strict_host_check=strict_host_check,
200
                               private_key=private_key, batch=batch,
201
                               ask_key=ask_key)
202

    
203
  def _StopMergingInstances(self):
204
    """Stop instances on merging clusters.
205

    
206
    """
207
    for cluster in self.clusters:
208
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
209
                                     " --force-multiple")
210

    
211
      if result.failed:
212
        raise errors.RemoteError("Unable to stop instances on %s."
213
                                 " Fail reason: %s; output: %s" %
214
                                 (cluster, result.fail_reason, result.output))
215

    
216
  def _DisableWatcher(self):
217
    """Disable watch on all merging clusters, including ourself.
218

    
219
    """
220
    for cluster in ["localhost"] + self.clusters:
221
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
222
                                     self.pause_period)
223

    
224
      if result.failed:
225
        raise errors.RemoteError("Unable to pause watcher on %s."
226
                                 " Fail reason: %s; output: %s" %
227
                                 (cluster, result.fail_reason, result.output))
228

    
229
  def _StopDaemons(self):
230
    """Stop all daemons on merging nodes.
231

    
232
    """
233
    cmd = "%s stop-all" % constants.DAEMON_UTIL
234
    for data in self.merger_data:
235
      for node in data.nodes:
236
        result = self._RunCmd(node, cmd)
237

    
238
        if result.failed:
239
          raise errors.RemoteError("Unable to stop daemons on %s."
240
                                   " Fail reason: %s; output: %s." %
241
                                   (node, result.fail_reason, result.output))
242

    
243
  def _FetchRemoteConfig(self):
244
    """Fetches and stores remote cluster config from the master.
245

    
246
    This step is needed before we can merge the config.
247

    
248
    """
249
    for data in self.merger_data:
250
      result = self._RunCmd(data.cluster, "cat %s" %
251
                                          constants.CLUSTER_CONF_FILE)
252

    
253
      if result.failed:
254
        raise errors.RemoteError("Unable to retrieve remote config on %s."
255
                                 " Fail reason: %s; output %s" %
256
                                 (data.cluster, result.fail_reason,
257
                                  result.output))
258

    
259
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
260
                                        data.cluster)
261
      utils.WriteFile(data.config_path, data=result.stdout)
262

    
263
  # R0201: Method could be a function
264
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
265
    """Kills the local master daemon.
266

    
267
    @raise errors.CommandError: If unable to kill
268

    
269
    """
270
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
271
    if result.failed:
272
      raise errors.CommandError("Unable to stop master daemons."
273
                                " Fail reason: %s; output: %s" %
274
                                (result.fail_reason, result.output))
275

    
276
  def _MergeConfig(self):
277
    """Merges all foreign config into our own config.
278

    
279
    """
280
    my_config = config.ConfigWriter(offline=True)
281
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
282

    
283
    for data in self.merger_data:
284
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
285
      self._MergeClusterConfigs(my_config, other_config)
286
      self._MergeNodeGroups(my_config, other_config)
287

    
288
      for node in other_config.GetNodeList():
289
        node_info = other_config.GetNodeInfo(node)
290
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
291
        fake_ec_id += 1
292

    
293
      for instance in other_config.GetInstanceList():
294
        instance_info = other_config.GetInstanceInfo(instance)
295

    
296
        # Update the DRBD port assignments
297
        # This is a little bit hackish
298
        for dsk in instance_info.disks:
299
          if dsk.dev_type in constants.LDS_DRBD:
300
            port = my_config.AllocatePort()
301

    
302
            logical_id = list(dsk.logical_id)
303
            logical_id[2] = port
304
            dsk.logical_id = tuple(logical_id)
305

    
306
            physical_id = list(dsk.physical_id)
307
            physical_id[1] = physical_id[3] = port
308
            dsk.physical_id = tuple(physical_id)
309

    
310
        my_config.AddInstance(instance_info,
311
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
312
        fake_ec_id += 1
313

    
314
  # R0201: Method could be a function
315
  def _MergeClusterConfigs(self, my_config, other_config):
316
    """Checks that all relevant cluster parameters are compatible
317

    
318
    """
319
    # pylint: disable-msg=R0201
320
    my_cluster = my_config.GetClusterInfo()
321
    other_cluster = other_config.GetClusterInfo()
322
    err_count = 0
323

    
324
    #
325
    # Generic checks
326
    #
327
    check_params = (
328
      "beparams",
329
      "default_iallocator",
330
      "drbd_usermode_helper",
331
      "file_storage_dir",
332
      "hidden_os",
333
      "maintain_node_health",
334
      "master_netdev",
335
      "ndparams",
336
      "nicparams",
337
      "primary_ip_family",
338
      "tags",
339
      "uid_pool",
340
      "volume_group_name",
341
      )
342
    for param_name in check_params:
343
      my_param = getattr(my_cluster, param_name)
344
      other_param = getattr(other_cluster, param_name)
345
      if my_param != other_param:
346
        logging.error("The value (%s) of the cluster parameter %s on %s"
347
                      " differs to this cluster's value (%s)",
348
                      other_param, param_name, other_cluster.cluster_name,
349
                      my_param)
350
        err_count += 1
351

    
352
    #
353
    # Custom checks
354
    #
355

    
356
    # Check default hypervisor
357
    my_defhyp = my_cluster.enabled_hypervisors[0]
358
    other_defhyp = other_cluster.enabled_hypervisors[0]
359
    if my_defhyp != other_defhyp:
360
      logging.warning("The default hypervisor (%s) differs on %s, new"
361
                      " instances will be created with this cluster's"
362
                      " default hypervisor (%s)", other_defhyp,
363
                      other_cluster.cluster_name, my_defhyp)
364

    
365
    if (set(my_cluster.enabled_hypervisors) !=
366
        set(other_cluster.enabled_hypervisors)):
367
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
368
                    " this cluster's set (%s)",
369
                    other_cluster.enabled_hypervisors,
370
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
371
      err_count += 1
372

    
373
    # Check hypervisor params for hypervisors we care about
374
    # TODO: we probably don't care about all params for a given hypervisor
375
    for hyp in my_cluster.enabled_hypervisors:
376
      for param in my_cluster.hvparams[hyp]:
377
        my_value = my_cluster.hvparams[hyp][param]
378
        other_value = other_cluster.hvparams[hyp][param]
379
        if my_value != other_value:
380
          logging.error("The value (%s) of the %s parameter of the %s"
381
                        " hypervisor on %s differs to this cluster's parameter"
382
                        " (%s)",
383
                        other_value, param, hyp, other_cluster.cluster_name,
384
                        my_value)
385
          err_count += 1
386

    
387
    # Check os hypervisor params for hypervisors we care about
388
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
389
      for hyp in my_cluster.enabled_hypervisors:
390
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
391
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
392
        if my_os_hvp != other_os_hvp:
393
          logging.error("The OS parameters (%s) for the %s OS for the %s"
394
                        " hypervisor on %s differs to this cluster's parameters"
395
                        " (%s)",
396
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
397
                        my_os_hvp)
398
          err_count += 1
399

    
400
    #
401
    # Warnings
402
    #
403
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
404
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
405
                      " this cluster's value (%s) will take precedence",
406
                      other_cluster.modify_etc_hosts,
407
                      other_cluster.cluster_name,
408
                      my_cluster.modify_etc_hosts)
409

    
410
    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
411
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
412
                      " this cluster's value (%s) will take precedence",
413
                      other_cluster.modify_ssh_setup,
414
                      other_cluster.cluster_name,
415
                      my_cluster.modify_ssh_setup)
416

    
417
    #
418
    # Actual merging
419
    #
420
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
421
                                       other_cluster.reserved_lvs))
422

    
423
    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
424
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
425
                      " cluster's value (%s). The least permissive value (%s)"
426
                      " will be used", other_cluster.prealloc_wipe_disks,
427
                      other_cluster.cluster_name,
428
                      my_cluster.prealloc_wipe_disks, True)
429
      my_cluster.prealloc_wipe_disks = True
430

    
431
    for os_, osparams in other_cluster.osparams.items():
432
      if os_ not in my_cluster.osparams:
433
        my_cluster.osparams[os_] = osparams
434
      elif my_cluster.osparams[os_] != osparams:
435
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
436
                      " this cluster's parameters (%s)",
437
                      osparams, os_, other_cluster.cluster_name,
438
                      my_cluster.osparams[os_])
439
        err_count += 1
440

    
441
    if err_count:
442
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
443
                                      " values, please fix and re-run" %
444
                                      other_cluster.cluster_name)
445

    
446
  # R0201: Method could be a function
447
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
448
    if os_name in cluster.os_hvp:
449
      return cluster.os_hvp[os_name].get(hyp, None)
450
    else:
451
      return None
452

    
453
  # R0201: Method could be a function
454
  def _MergeNodeGroups(self, my_config, other_config):
455
    """Adds foreign node groups
456

    
457
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
458
    """
459
    # pylint: disable-msg=R0201
460
    logging.info("Node group conflict strategy: %s", self.groups)
461

    
462
    my_grps = my_config.GetAllNodeGroupsInfo().values()
463
    other_grps = other_config.GetAllNodeGroupsInfo().values()
464

    
465
    # Check for node group naming conflicts:
466
    conflicts = []
467
    for other_grp in other_grps:
468
      for my_grp in my_grps:
469
        if other_grp.name == my_grp.name:
470
          conflicts.append(other_grp)
471

    
472
    if conflicts:
473
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
474
      logging.info("Node groups in both local and remote cluster: %s",
475
                   conflict_names)
476

    
477
      # User hasn't specified how to handle conflicts
478
      if not self.groups:
479
        raise errors.CommandError("The following node group(s) are in both"
480
                                  " clusters, and no merge strategy has been"
481
                                  " supplied (see the --groups option): %s" %
482
                                  conflict_names)
483

    
484
      # User wants to rename conflicts
485
      elif self.groups == _GROUPS_RENAME:
486
        for grp in conflicts:
487
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
488
          logging.info("Renaming remote node group from %s to %s"
489
                       " to resolve conflict", grp.name, new_name)
490
          grp.name = new_name
491

    
492
      # User wants to merge conflicting groups
493
      elif self.groups == 'merge':
494
        for other_grp in conflicts:
495
          logging.info("Merging local and remote '%s' groups", other_grp.name)
496
          for node_name in other_grp.members[:]:
497
            node = other_config.GetNodeInfo(node_name)
498
            # Access to a protected member of a client class
499
            # pylint: disable-msg=W0212
500
            other_config._UnlockedRemoveNodeFromGroup(node)
501

    
502
            # Access to a protected member of a client class
503
            # pylint: disable-msg=W0212
504
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
505

    
506
            # Access to a protected member of a client class
507
            # pylint: disable-msg=W0212
508
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
509
            node.group = my_grp_uuid
510
          # Remove from list of groups to add
511
          other_grps.remove(other_grp)
512

    
513
    for grp in other_grps:
514
      #TODO: handle node group conflicts
515
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
516

    
517
  # R0201: Method could be a function
518
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
519
    """Starts the local master daemon.
520

    
521
    @param no_vote: Should the masterd started without voting? default: False
522
    @raise errors.CommandError: If unable to start daemon.
523

    
524
    """
525
    env = {}
526
    if no_vote:
527
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
528

    
529
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
530
    if result.failed:
531
      raise errors.CommandError("Couldn't start ganeti master."
532
                                " Fail reason: %s; output: %s" %
533
                                (result.fail_reason, result.output))
534

    
535
  def _ReaddMergedNodesAndRedist(self):
536
    """Readds all merging nodes and make sure their config is up-to-date.
537

    
538
    @raise errors.CommandError: If anything fails.
539

    
540
    """
541
    for data in self.merger_data:
542
      for node in data.nodes:
543
        result = utils.RunCmd(["gnt-node", "add", "--readd",
544
                               "--no-ssh-key-check", "--force-join", node])
545
        if result.failed:
546
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
547
                                    " output: %s" % (node, result.fail_reason,
548
                                                     result.output))
549

    
550
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
551
    if result.failed:
552
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
553
                                " output: %s" % (result.fail_reason,
554
                                                result.output))
555

    
556
  # R0201: Method could be a function
557
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
558
    """Starts up all instances (locally).
559

    
560
    @raise errors.CommandError: If unable to start clusters
561

    
562
    """
563
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
564
                           "--force-multiple"])
565
    if result.failed:
566
      raise errors.CommandError("Unable to start all instances."
567
                                " Fail reason: %s; output: %s" %
568
                                (result.fail_reason, result.output))
569

    
570
  # R0201: Method could be a function
571
  def _VerifyCluster(self): # pylint: disable-msg=R0201
572
    """Runs gnt-cluster verify to verify the health.
573

    
574
    @raise errors.ProgrammError: If cluster fails on verification
575

    
576
    """
577
    result = utils.RunCmd(["gnt-cluster", "verify"])
578
    if result.failed:
579
      raise errors.CommandError("Verification of cluster failed."
580
                                " Fail reason: %s; output: %s" %
581
                                (result.fail_reason, result.output))
582

    
583
  def Merge(self):
584
    """Does the actual merge.
585

    
586
    It runs all the steps in the right order and updates the user about steps
587
    taken. Also it keeps track of rollback_steps to undo everything.
588

    
589
    """
590
    rbsteps = []
591
    try:
592
      logging.info("Pre cluster verification")
593
      self._VerifyCluster()
594

    
595
      logging.info("Prepare authorized_keys")
596
      rbsteps.append("Remove our key from authorized_keys on nodes:"
597
                     " %(nodes)s")
598
      self._PrepareAuthorizedKeys()
599

    
600
      rbsteps.append("Start all instances again on the merging"
601
                     " clusters: %(clusters)s")
602
      logging.info("Stopping merging instances (takes a while)")
603
      self._StopMergingInstances()
604

    
605
      logging.info("Disable watcher")
606
      self._DisableWatcher()
607
      logging.info("Stop daemons on merging nodes")
608
      self._StopDaemons()
609
      logging.info("Merging config")
610
      self._FetchRemoteConfig()
611

    
612
      logging.info("Stopping master daemon")
613
      self._KillMasterDaemon()
614

    
615
      rbsteps.append("Restore %s from another master candidate"
616
                     " and restart master daemon" %
617
                     constants.CLUSTER_CONF_FILE)
618
      self._MergeConfig()
619
      self._StartMasterDaemon(no_vote=True)
620

    
621
      # Point of no return, delete rbsteps
622
      del rbsteps[:]
623

    
624
      logging.warning("We are at the point of no return. Merge can not easily"
625
                      " be undone after this point.")
626
      logging.info("Readd nodes")
627
      self._ReaddMergedNodesAndRedist()
628

    
629
      logging.info("Merge done, restart master daemon normally")
630
      self._KillMasterDaemon()
631
      self._StartMasterDaemon()
632

    
633
      logging.info("Starting instances again")
634
      self._StartupAllInstances()
635
      logging.info("Post cluster verification")
636
      self._VerifyCluster()
637
    except errors.GenericError, e:
638
      logging.exception(e)
639

    
640
      if rbsteps:
641
        nodes = Flatten([data.nodes for data in self.merger_data])
642
        info = {
643
          "clusters": self.clusters,
644
          "nodes": nodes,
645
          }
646
        logging.critical("In order to rollback do the following:")
647
        for step in rbsteps:
648
          logging.critical("  * %s", step % info)
649
      else:
650
        logging.critical("Nothing to rollback.")
651

    
652
      # TODO: Keep track of steps done for a flawless resume?
653

    
654
  def Cleanup(self):
655
    """Clean up our environment.
656

    
657
    This cleans up remote private keys and configs and after that
658
    deletes the temporary directory.
659

    
660
    """
661
    shutil.rmtree(self.work_dir)
662

    
663

    
664
def SetupLogging(options):
665
  """Setting up logging infrastructure.
666

    
667
  @param options: Parsed command line options
668

    
669
  """
670
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
671

    
672
  stderr_handler = logging.StreamHandler()
673
  stderr_handler.setFormatter(formatter)
674
  if options.debug:
675
    stderr_handler.setLevel(logging.NOTSET)
676
  elif options.verbose:
677
    stderr_handler.setLevel(logging.INFO)
678
  else:
679
    stderr_handler.setLevel(logging.WARNING)
680

    
681
  root_logger = logging.getLogger("")
682
  root_logger.setLevel(logging.NOTSET)
683
  root_logger.addHandler(stderr_handler)
684

    
685

    
686
def main():
687
  """Main routine.
688

    
689
  """
690
  program = os.path.basename(sys.argv[0])
691

    
692
  parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
693
                                        " [--watcher-pause-period SECONDS]"
694
                                        " [--groups [%s|%s]]"
695
                                        " <cluster> [<cluster...>]" %
696
                                        (_GROUPS_MERGE, _GROUPS_RENAME)),
697
                                        prog=program)
698
  parser.add_option(cli.DEBUG_OPT)
699
  parser.add_option(cli.VERBOSE_OPT)
700
  parser.add_option(PAUSE_PERIOD_OPT)
701
  parser.add_option(GROUPS_OPT)
702

    
703
  (options, args) = parser.parse_args()
704

    
705
  SetupLogging(options)
706

    
707
  if not args:
708
    parser.error("No clusters specified")
709

    
710
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
711
                          options.groups)
712
  try:
713
    try:
714
      cluster_merger.Setup()
715
      cluster_merger.Merge()
716
    except errors.GenericError, e:
717
      logging.exception(e)
718
      return constants.EXIT_FAILURE
719
  finally:
720
    cluster_merger.Cleanup()
721

    
722
  return constants.EXIT_SUCCESS
723

    
724

    
725
if __name__ == "__main__":
726
  sys.exit(main())