Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ a6c8fd10

History | View | Annotate | Download (25.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
_GROUPS_MERGE = "merge"
46
_GROUPS_RENAME = "rename"
47
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48

    
49
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50
                                  action="store", type="int",
51
                                  dest="pause_period",
52
                                  help=("Amount of time in seconds watcher"
53
                                        " should be suspended from running"))
54
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
56
                            dest="groups",
57
                            help=("How to handle groups that have the"
58
                                  " same name (One of: %s/%s)" %
59
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
60

    
61

    
62
def Flatten(unflattened_list):
63
  """Flattens a list.
64

    
65
  @param unflattened_list: A list of unflattened list objects.
66
  @return: A flattened list
67

    
68
  """
69
  flattened_list = []
70

    
71
  for item in unflattened_list:
72
    if isinstance(item, list):
73
      flattened_list.extend(Flatten(item))
74
    else:
75
      flattened_list.append(item)
76
  return flattened_list
77

    
78

    
79
class MergerData(object):
80
  """Container class to hold data used for merger.
81

    
82
  """
83
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
84
    """Initialize the container.
85

    
86
    @param cluster: The name of the cluster
87
    @param key_path: Path to the ssh private key used for authentication
88
    @param nodes: List of nodes in the merging cluster
89
    @param instances: List of instances running on merging cluster
90
    @param config_path: Path to the merging cluster config
91

    
92
    """
93
    self.cluster = cluster
94
    self.key_path = key_path
95
    self.nodes = nodes
96
    self.instances = instances
97
    self.config_path = config_path
98

    
99

    
100
class Merger(object):
101
  """Handling the merge.
102

    
103
  """
104
  def __init__(self, clusters, pause_period, groups):
105
    """Initialize object with sane defaults and infos required.
106

    
107
    @param clusters: The list of clusters to merge in
108
    @param pause_period: The time watcher shall be disabled for
109
    @param groups: How to handle group conflicts
110

    
111
    """
112
    self.merger_data = []
113
    self.clusters = clusters
114
    self.pause_period = pause_period
115
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
116
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
117
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
118
    self.groups = groups
119

    
120
  def Setup(self):
121
    """Sets up our end so we can do the merger.
122

    
123
    This method is setting us up as a preparation for the merger.
124
    It makes the initial contact and gathers information needed.
125

    
126
    @raise errors.RemoteError: for errors in communication/grabbing
127

    
128
    """
129
    (remote_path, _, _) = ssh.GetUserFiles("root")
130

    
131
    if self.cluster_name in self.clusters:
132
      raise errors.CommandError("Cannot merge cluster %s with itself" %
133
                                self.cluster_name)
134

    
135
    # Fetch remotes private key
136
    for cluster in self.clusters:
137
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
138
                            ask_key=False)
139
      if result.failed:
140
        raise errors.RemoteError("There was an error while grabbing ssh private"
141
                                 " key from %s. Fail reason: %s; output: %s" %
142
                                 (cluster, result.fail_reason, result.output))
143

    
144
      key_path = utils.PathJoin(self.work_dir, cluster)
145
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
146

    
147
      result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
148
                            private_key=key_path)
149
      if result.failed:
150
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
151
                                 " Fail reason: %s; output: %s" %
152
                                 (cluster, result.fail_reason, result.output))
153
      nodes = result.stdout.splitlines()
154

    
155
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
156
                            private_key=key_path)
157
      if result.failed:
158
        raise errors.RemoteError("Unable to retrieve list of instances from"
159
                                 " %s. Fail reason: %s; output: %s" %
160
                                 (cluster, result.fail_reason, result.output))
161
      instances = result.stdout.splitlines()
162

    
163
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
164

    
165
  def _PrepareAuthorizedKeys(self):
166
    """Prepare the authorized_keys on every merging node.
167

    
168
    This method add our public key to remotes authorized_key for further
169
    communication.
170

    
171
    """
172
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
173
    pub_key = utils.ReadFile(pub_key_file)
174

    
175
    for data in self.merger_data:
176
      for node in data.nodes:
177
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
178
                                     (auth_keys, pub_key)),
179
                              private_key=data.key_path)
180

    
181
        if result.failed:
182
          raise errors.RemoteError("Unable to add our public key to %s in %s."
183
                                   " Fail reason: %s; output: %s" %
184
                                   (node, data.cluster, result.fail_reason,
185
                                    result.output))
186

    
187
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
188
              strict_host_check=False, private_key=None, batch=True,
189
              ask_key=False):
190
    """Wrapping SshRunner.Run with default parameters.
191

    
192
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
193

    
194
    """
195
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
196
                               use_cluster_key=use_cluster_key,
197
                               strict_host_check=strict_host_check,
198
                               private_key=private_key, batch=batch,
199
                               ask_key=ask_key)
200

    
201
  def _StopMergingInstances(self):
202
    """Stop instances on merging clusters.
203

    
204
    """
205
    for cluster in self.clusters:
206
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
207
                                     " --force-multiple")
208

    
209
      if result.failed:
210
        raise errors.RemoteError("Unable to stop instances on %s."
211
                                 " Fail reason: %s; output: %s" %
212
                                 (cluster, result.fail_reason, result.output))
213

    
214
  def _DisableWatcher(self):
215
    """Disable watch on all merging clusters, including ourself.
216

    
217
    """
218
    for cluster in ["localhost"] + self.clusters:
219
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
220
                                     self.pause_period)
221

    
222
      if result.failed:
223
        raise errors.RemoteError("Unable to pause watcher on %s."
224
                                 " Fail reason: %s; output: %s" %
225
                                 (cluster, result.fail_reason, result.output))
226

    
227
  def _StopDaemons(self):
228
    """Stop all daemons on merging nodes.
229

    
230
    """
231
    cmd = "%s stop-all" % constants.DAEMON_UTIL
232
    for data in self.merger_data:
233
      for node in data.nodes:
234
        result = self._RunCmd(node, cmd)
235

    
236
        if result.failed:
237
          raise errors.RemoteError("Unable to stop daemons on %s."
238
                                   " Fail reason: %s; output: %s." %
239
                                   (node, result.fail_reason, result.output))
240

    
241
  def _FetchRemoteConfig(self):
242
    """Fetches and stores remote cluster config from the master.
243

    
244
    This step is needed before we can merge the config.
245

    
246
    """
247
    for data in self.merger_data:
248
      result = self._RunCmd(data.cluster, "cat %s" %
249
                                          constants.CLUSTER_CONF_FILE)
250

    
251
      if result.failed:
252
        raise errors.RemoteError("Unable to retrieve remote config on %s."
253
                                 " Fail reason: %s; output %s" %
254
                                 (data.cluster, result.fail_reason,
255
                                  result.output))
256

    
257
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
258
                                        data.cluster)
259
      utils.WriteFile(data.config_path, data=result.stdout)
260

    
261
  # R0201: Method could be a function
262
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
263
    """Kills the local master daemon.
264

    
265
    @raise errors.CommandError: If unable to kill
266

    
267
    """
268
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
269
    if result.failed:
270
      raise errors.CommandError("Unable to stop master daemons."
271
                                " Fail reason: %s; output: %s" %
272
                                (result.fail_reason, result.output))
273

    
274
  def _MergeConfig(self):
275
    """Merges all foreign config into our own config.
276

    
277
    """
278
    my_config = config.ConfigWriter(offline=True)
279
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
280

    
281
    for data in self.merger_data:
282
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
283
      self._MergeClusterConfigs(my_config, other_config)
284
      self._MergeNodeGroups(my_config, other_config)
285

    
286
      for node in other_config.GetNodeList():
287
        node_info = other_config.GetNodeInfo(node)
288
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
289
        fake_ec_id += 1
290

    
291
      for instance in other_config.GetInstanceList():
292
        instance_info = other_config.GetInstanceInfo(instance)
293

    
294
        # Update the DRBD port assignments
295
        # This is a little bit hackish
296
        for dsk in instance_info.disks:
297
          if dsk.dev_type in constants.LDS_DRBD:
298
            port = my_config.AllocatePort()
299

    
300
            logical_id = list(dsk.logical_id)
301
            logical_id[2] = port
302
            dsk.logical_id = tuple(logical_id)
303

    
304
            physical_id = list(dsk.physical_id)
305
            physical_id[1] = physical_id[3] = port
306
            dsk.physical_id = tuple(physical_id)
307

    
308
        my_config.AddInstance(instance_info,
309
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
310
        fake_ec_id += 1
311

    
312
  # R0201: Method could be a function
313
  def _MergeClusterConfigs(self, my_config, other_config):
314
    """Checks that all relevant cluster parameters are compatible
315

    
316
    """
317
    # pylint: disable-msg=R0201
318
    my_cluster = my_config.GetClusterInfo()
319
    other_cluster = other_config.GetClusterInfo()
320
    err_count = 0
321

    
322
    #
323
    # Generic checks
324
    #
325
    check_params = (
326
      "beparams",
327
      "default_iallocator",
328
      "drbd_usermode_helper",
329
      "file_storage_dir",
330
      "hidden_os",
331
      "maintain_node_health",
332
      "master_netdev",
333
      "ndparams",
334
      "nicparams",
335
      "primary_ip_family",
336
      "tags",
337
      "uid_pool",
338
      "volume_group_name",
339
      )
340
    for param_name in check_params:
341
      my_param = getattr(my_cluster, param_name)
342
      other_param = getattr(other_cluster, param_name)
343
      if my_param != other_param:
344
        logging.error("The value (%s) of the cluster parameter %s on %s"
345
                      " differs to this cluster's value (%s)",
346
                      other_param, param_name, other_cluster.cluster_name,
347
                      my_param)
348
        err_count += 1
349

    
350
    #
351
    # Custom checks
352
    #
353

    
354
    # Check default hypervisor
355
    my_defhyp = my_cluster.enabled_hypervisors[0]
356
    other_defhyp = other_cluster.enabled_hypervisors[0]
357
    if my_defhyp != other_defhyp:
358
      logging.warning("The default hypervisor (%s) differs on %s, new"
359
                      " instances will be created with this cluster's"
360
                      " default hypervisor (%s)", other_defhyp,
361
                      other_cluster.cluster_name, my_defhyp)
362

    
363
    if (set(my_cluster.enabled_hypervisors) !=
364
        set(other_cluster.enabled_hypervisors)):
365
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
366
                    " this cluster's set (%s)",
367
                    other_cluster.enabled_hypervisors,
368
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
369
      err_count += 1
370

    
371
    # Check hypervisor params for hypervisors we care about
372
    # TODO: we probably don't care about all params for a given hypervisor
373
    for hyp in my_cluster.enabled_hypervisors:
374
      for param in my_cluster.hvparams[hyp]:
375
        my_value = my_cluster.hvparams[hyp][param]
376
        other_value = other_cluster.hvparams[hyp][param]
377
        if my_value != other_value:
378
          logging.error("The value (%s) of the %s parameter of the %s"
379
                        " hypervisor on %s differs to this cluster's parameter"
380
                        " (%s)",
381
                        other_value, param, hyp, other_cluster.cluster_name,
382
                        my_value)
383
          err_count += 1
384

    
385
    # Check os hypervisor params for hypervisors we care about
386
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
387
      for hyp in my_cluster.enabled_hypervisors:
388
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
389
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
390
        if my_os_hvp != other_os_hvp:
391
          logging.error("The OS parameters (%s) for the %s OS for the %s"
392
                        " hypervisor on %s differs to this cluster's parameters"
393
                        " (%s)",
394
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
395
                        my_os_hvp)
396
          err_count += 1
397

    
398
    #
399
    # Warnings
400
    #
401
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
402
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
403
                      " this cluster's value (%s) will take precedence",
404
                      other_cluster.modify_etc_hosts,
405
                      other_cluster.cluster_name,
406
                      my_cluster.modify_etc_hosts)
407

    
408
    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
409
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
410
                      " this cluster's value (%s) will take precedence",
411
                      other_cluster.modify_ssh_setup,
412
                      other_cluster.cluster_name,
413
                      my_cluster.modify_ssh_setup)
414

    
415
    #
416
    # Actual merging
417
    #
418
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
419
                                       other_cluster.reserved_lvs))
420

    
421
    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
422
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
423
                      " cluster's value (%s). The least permissive value (%s)"
424
                      " will be used", other_cluster.prealloc_wipe_disks,
425
                      other_cluster.cluster_name,
426
                      my_cluster.prealloc_wipe_disks, True)
427
      my_cluster.prealloc_wipe_disks = True
428

    
429
    for os_, osparams in other_cluster.osparams.items():
430
      if os_ not in my_cluster.osparams:
431
        my_cluster.osparams[os_] = osparams
432
      elif my_cluster.osparams[os_] != osparams:
433
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
434
                      " this cluster's parameters (%s)",
435
                      osparams, os_, other_cluster.cluster_name,
436
                      my_cluster.osparams[os_])
437
        err_count += 1
438

    
439
    if err_count:
440
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
441
                                      " values, please fix and re-run" %
442
                                      other_cluster.cluster_name)
443

    
444
  # R0201: Method could be a function
445
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable-msg=R0201
446
    if os_name in cluster.os_hvp:
447
      return cluster.os_hvp[os_name].get(hyp, None)
448
    else:
449
      return None
450

    
451
  # R0201: Method could be a function
452
  def _MergeNodeGroups(self, my_config, other_config):
453
    """Adds foreign node groups
454

    
455
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
456
    """
457
    # pylint: disable-msg=R0201
458
    logging.info("Node group conflict strategy: %s", self.groups)
459

    
460
    my_grps = my_config.GetAllNodeGroupsInfo().values()
461
    other_grps = other_config.GetAllNodeGroupsInfo().values()
462

    
463
    # Check for node group naming conflicts:
464
    conflicts = []
465
    for other_grp in other_grps:
466
      for my_grp in my_grps:
467
        if other_grp.name == my_grp.name:
468
          conflicts.append(other_grp)
469

    
470
    if conflicts:
471
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
472
      logging.info("Node groups in both local and remote cluster: %s",
473
                   conflict_names)
474

    
475
      # User hasn't specified how to handle conflicts
476
      if not self.groups:
477
        raise errors.CommandError("The following node group(s) are in both"
478
                                  " clusters, and no merge strategy has been"
479
                                  " supplied (see the --groups option): %s" %
480
                                  conflict_names)
481

    
482
      # User wants to rename conflicts
483
      elif self.groups == _GROUPS_RENAME:
484
        for grp in conflicts:
485
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
486
          logging.info("Renaming remote node group from %s to %s"
487
                       " to resolve conflict", grp.name, new_name)
488
          grp.name = new_name
489

    
490
      # User wants to merge conflicting groups
491
      elif self.groups == 'merge':
492
        for other_grp in conflicts:
493
          logging.info("Merging local and remote '%s' groups", other_grp.name)
494
          for node_name in other_grp.members[:]:
495
            node = other_config.GetNodeInfo(node_name)
496
            # Access to a protected member of a client class
497
            # pylint: disable-msg=W0212
498
            other_config._UnlockedRemoveNodeFromGroup(node)
499

    
500
            # Access to a protected member of a client class
501
            # pylint: disable-msg=W0212
502
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
503

    
504
            # Access to a protected member of a client class
505
            # pylint: disable-msg=W0212
506
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
507
            node.group = my_grp_uuid
508
          # Remove from list of groups to add
509
          other_grps.remove(other_grp)
510

    
511
    for grp in other_grps:
512
      #TODO: handle node group conflicts
513
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
514

    
515
  # R0201: Method could be a function
516
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
517
    """Starts the local master daemon.
518

    
519
    @param no_vote: Should the masterd started without voting? default: False
520
    @raise errors.CommandError: If unable to start daemon.
521

    
522
    """
523
    env = {}
524
    if no_vote:
525
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
526

    
527
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
528
    if result.failed:
529
      raise errors.CommandError("Couldn't start ganeti master."
530
                                " Fail reason: %s; output: %s" %
531
                                (result.fail_reason, result.output))
532

    
533
  def _ReaddMergedNodesAndRedist(self):
534
    """Readds all merging nodes and make sure their config is up-to-date.
535

    
536
    @raise errors.CommandError: If anything fails.
537

    
538
    """
539
    for data in self.merger_data:
540
      for node in data.nodes:
541
        result = utils.RunCmd(["gnt-node", "add", "--readd",
542
                               "--no-ssh-key-check", "--force-join", node])
543
        if result.failed:
544
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
545
                                    " output: %s" % (node, result.fail_reason,
546
                                                     result.output))
547

    
548
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
549
    if result.failed:
550
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
551
                                " output: %s" % (result.fail_reason,
552
                                                result.output))
553

    
554
  # R0201: Method could be a function
555
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
556
    """Starts up all instances (locally).
557

    
558
    @raise errors.CommandError: If unable to start clusters
559

    
560
    """
561
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
562
                           "--force-multiple"])
563
    if result.failed:
564
      raise errors.CommandError("Unable to start all instances."
565
                                " Fail reason: %s; output: %s" %
566
                                (result.fail_reason, result.output))
567

    
568
  # R0201: Method could be a function
569
  def _VerifyCluster(self): # pylint: disable-msg=R0201
570
    """Runs gnt-cluster verify to verify the health.
571

    
572
    @raise errors.ProgrammError: If cluster fails on verification
573

    
574
    """
575
    result = utils.RunCmd(["gnt-cluster", "verify"])
576
    if result.failed:
577
      raise errors.CommandError("Verification of cluster failed."
578
                                " Fail reason: %s; output: %s" %
579
                                (result.fail_reason, result.output))
580

    
581
  def Merge(self):
582
    """Does the actual merge.
583

    
584
    It runs all the steps in the right order and updates the user about steps
585
    taken. Also it keeps track of rollback_steps to undo everything.
586

    
587
    """
588
    rbsteps = []
589
    try:
590
      logging.info("Pre cluster verification")
591
      self._VerifyCluster()
592

    
593
      logging.info("Prepare authorized_keys")
594
      rbsteps.append("Remove our key from authorized_keys on nodes:"
595
                     " %(nodes)s")
596
      self._PrepareAuthorizedKeys()
597

    
598
      rbsteps.append("Start all instances again on the merging"
599
                     " clusters: %(clusters)s")
600
      logging.info("Stopping merging instances (takes a while)")
601
      self._StopMergingInstances()
602

    
603
      logging.info("Disable watcher")
604
      self._DisableWatcher()
605
      logging.info("Stop daemons on merging nodes")
606
      self._StopDaemons()
607
      logging.info("Merging config")
608
      self._FetchRemoteConfig()
609

    
610
      logging.info("Stopping master daemon")
611
      self._KillMasterDaemon()
612

    
613
      rbsteps.append("Restore %s from another master candidate"
614
                     " and restart master daemon" %
615
                     constants.CLUSTER_CONF_FILE)
616
      self._MergeConfig()
617
      self._StartMasterDaemon(no_vote=True)
618

    
619
      # Point of no return, delete rbsteps
620
      del rbsteps[:]
621

    
622
      logging.warning("We are at the point of no return. Merge can not easily"
623
                      " be undone after this point.")
624
      logging.info("Readd nodes")
625
      self._ReaddMergedNodesAndRedist()
626

    
627
      logging.info("Merge done, restart master daemon normally")
628
      self._KillMasterDaemon()
629
      self._StartMasterDaemon()
630

    
631
      logging.info("Starting instances again")
632
      self._StartupAllInstances()
633
      logging.info("Post cluster verification")
634
      self._VerifyCluster()
635
    except errors.GenericError, e:
636
      logging.exception(e)
637

    
638
      if rbsteps:
639
        nodes = Flatten([data.nodes for data in self.merger_data])
640
        info = {
641
          "clusters": self.clusters,
642
          "nodes": nodes,
643
          }
644
        logging.critical("In order to rollback do the following:")
645
        for step in rbsteps:
646
          logging.critical("  * %s", step % info)
647
      else:
648
        logging.critical("Nothing to rollback.")
649

    
650
      # TODO: Keep track of steps done for a flawless resume?
651

    
652
  def Cleanup(self):
653
    """Clean up our environment.
654

    
655
    This cleans up remote private keys and configs and after that
656
    deletes the temporary directory.
657

    
658
    """
659
    shutil.rmtree(self.work_dir)
660

    
661

    
662
def SetupLogging(options):
663
  """Setting up logging infrastructure.
664

    
665
  @param options: Parsed command line options
666

    
667
  """
668
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
669

    
670
  stderr_handler = logging.StreamHandler()
671
  stderr_handler.setFormatter(formatter)
672
  if options.debug:
673
    stderr_handler.setLevel(logging.NOTSET)
674
  elif options.verbose:
675
    stderr_handler.setLevel(logging.INFO)
676
  else:
677
    stderr_handler.setLevel(logging.WARNING)
678

    
679
  root_logger = logging.getLogger("")
680
  root_logger.setLevel(logging.NOTSET)
681
  root_logger.addHandler(stderr_handler)
682

    
683

    
684
def main():
685
  """Main routine.
686

    
687
  """
688
  program = os.path.basename(sys.argv[0])
689

    
690
  parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
691
                                        " [--watcher-pause-period SECONDS]"
692
                                        " [--groups [%s|%s]]"
693
                                        " <cluster> [<cluster...>]" %
694
                                        (_GROUPS_MERGE, _GROUPS_RENAME)),
695
                                        prog=program)
696
  parser.add_option(cli.DEBUG_OPT)
697
  parser.add_option(cli.VERBOSE_OPT)
698
  parser.add_option(PAUSE_PERIOD_OPT)
699
  parser.add_option(GROUPS_OPT)
700

    
701
  (options, args) = parser.parse_args()
702

    
703
  SetupLogging(options)
704

    
705
  if not args:
706
    parser.error("No clusters specified")
707

    
708
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
709
                          options.groups)
710
  try:
711
    try:
712
      cluster_merger.Setup()
713
      cluster_merger.Merge()
714
    except errors.GenericError, e:
715
      logging.exception(e)
716
      return constants.EXIT_FAILURE
717
  finally:
718
    cluster_merger.Cleanup()
719

    
720
  return constants.EXIT_SUCCESS
721

    
722

    
723
if __name__ == "__main__":
724
  sys.exit(main())