Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ a3fad332

History | View | Annotate | Download (30.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
_GROUPS_MERGE = "merge"
46
_GROUPS_RENAME = "rename"
47
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48
_RESTART_ALL = "all"
49
_RESTART_UP = "up"
50
_RESTART_NONE = "none"
51
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
52
_PARAMS_STRICT = "strict"
53
_PARAMS_WARN = "warn"
54
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
55

    
56

    
57
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
58
                                  action="store", type="int",
59
                                  dest="pause_period",
60
                                  help=("Amount of time in seconds watcher"
61
                                        " should be suspended from running"))
62
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
63
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
64
                            dest="groups",
65
                            help=("How to handle groups that have the"
66
                                  " same name (One of: %s/%s)" %
67
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
68
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
69
                            metavar="STRATEGY",
70
                            choices=_PARAMS_CHOICES,
71
                            dest="params",
72
                            help=("How to handle params that have"
73
                                  " different values (One of: %s/%s)" %
74
                                  _PARAMS_CHOICES))
75

    
76
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
77
                             metavar="STRATEGY",
78
                             choices=_RESTART_CHOICES,
79
                             dest="restart",
80
                             help=("How to handle restarting instances"
81
                                   " same name (One of: %s/%s/%s)" %
82
                                   _RESTART_CHOICES))
83

    
84
SKIP_STOP_INSTANCES_OPT = \
85
  cli.cli_option("--skip-stop-instances", default=True, action="store_false",
86
                 dest="stop_instances",
87
                 help=("Don't stop the instances on the clusters, just check "
88
                       "that none is running"))
89

    
90

    
91
def Flatten(unflattened_list):
92
  """Flattens a list.
93

    
94
  @param unflattened_list: A list of unflattened list objects.
95
  @return: A flattened list
96

    
97
  """
98
  flattened_list = []
99

    
100
  for item in unflattened_list:
101
    if isinstance(item, list):
102
      flattened_list.extend(Flatten(item))
103
    else:
104
      flattened_list.append(item)
105
  return flattened_list
106

    
107

    
108
class MergerData(object):
109
  """Container class to hold data used for merger.
110

    
111
  """
112
  def __init__(self, cluster, key_path, nodes, instances, master_node,
113
               config_path=None):
114
    """Initialize the container.
115

    
116
    @param cluster: The name of the cluster
117
    @param key_path: Path to the ssh private key used for authentication
118
    @param nodes: List of online nodes in the merging cluster
119
    @param instances: List of instances running on merging cluster
120
    @param master_node: Name of the master node
121
    @param config_path: Path to the merging cluster config
122

    
123
    """
124
    self.cluster = cluster
125
    self.key_path = key_path
126
    self.nodes = nodes
127
    self.instances = instances
128
    self.master_node = master_node
129
    self.config_path = config_path
130

    
131

    
132
class Merger(object):
133
  """Handling the merge.
134

    
135
  """
136
  RUNNING_STATUSES = frozenset([
137
    constants.INSTST_RUNNING,
138
    constants.INSTST_ERRORUP,
139
    ])
140

    
141
  def __init__(self, clusters, pause_period, groups, restart, params,
142
               stop_instances):
143
    """Initialize object with sane defaults and infos required.
144

    
145
    @param clusters: The list of clusters to merge in
146
    @param pause_period: The time watcher shall be disabled for
147
    @param groups: How to handle group conflicts
148
    @param restart: How to handle instance restart
149
    @param stop_instances: Indicates whether the instances must be stopped
150
                           (True) or if the Merger must only check if no
151
                           instances are running on the mergee clusters (False)
152

    
153
    """
154
    self.merger_data = []
155
    self.clusters = clusters
156
    self.pause_period = pause_period
157
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
158
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
159
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
160
    self.groups = groups
161
    self.restart = restart
162
    self.params = params
163
    self.stop_instances = stop_instances
164
    if self.restart == _RESTART_UP:
165
      raise NotImplementedError
166

    
167
  def Setup(self):
168
    """Sets up our end so we can do the merger.
169

    
170
    This method is setting us up as a preparation for the merger.
171
    It makes the initial contact and gathers information needed.
172

    
173
    @raise errors.RemoteError: for errors in communication/grabbing
174

    
175
    """
176
    (remote_path, _, _) = ssh.GetUserFiles("root")
177

    
178
    if self.cluster_name in self.clusters:
179
      raise errors.CommandError("Cannot merge cluster %s with itself" %
180
                                self.cluster_name)
181

    
182
    # Fetch remotes private key
183
    for cluster in self.clusters:
184
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
185
                            ask_key=False)
186
      if result.failed:
187
        raise errors.RemoteError("There was an error while grabbing ssh private"
188
                                 " key from %s. Fail reason: %s; output: %s" %
189
                                 (cluster, result.fail_reason, result.output))
190

    
191
      key_path = utils.PathJoin(self.work_dir, cluster)
192
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
193

    
194
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
195
                            " --no-header --separator=,", private_key=key_path)
196
      if result.failed:
197
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
198
                                 " Fail reason: %s; output: %s" %
199
                                 (cluster, result.fail_reason, result.output))
200
      nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
201
      nodes = [node_status[0] for node_status in nodes_statuses
202
               if node_status[1] == "N"]
203

    
204
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
205
                            private_key=key_path)
206
      if result.failed:
207
        raise errors.RemoteError("Unable to retrieve list of instances from"
208
                                 " %s. Fail reason: %s; output: %s" %
209
                                 (cluster, result.fail_reason, result.output))
210
      instances = result.stdout.splitlines()
211

    
212
      path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
213
                            constants.SS_MASTER_NODE)
214
      result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
215
      if result.failed:
216
        raise errors.RemoteError("Unable to retrieve the master node name from"
217
                                 " %s. Fail reason: %s; output: %s" %
218
                                 (cluster, result.fail_reason, result.output))
219
      master_node = result.stdout.strip()
220

    
221
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
222
                                         master_node))
223

    
224
  def _PrepareAuthorizedKeys(self):
225
    """Prepare the authorized_keys on every merging node.
226

    
227
    This method add our public key to remotes authorized_key for further
228
    communication.
229

    
230
    """
231
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
232
    pub_key = utils.ReadFile(pub_key_file)
233

    
234
    for data in self.merger_data:
235
      for node in data.nodes:
236
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
237
                                     (auth_keys, pub_key)),
238
                              private_key=data.key_path, max_attempts=3)
239

    
240
        if result.failed:
241
          raise errors.RemoteError("Unable to add our public key to %s in %s."
242
                                   " Fail reason: %s; output: %s" %
243
                                   (node, data.cluster, result.fail_reason,
244
                                    result.output))
245

    
246
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
247
              strict_host_check=False, private_key=None, batch=True,
248
              ask_key=False, max_attempts=1):
249
    """Wrapping SshRunner.Run with default parameters.
250

    
251
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
252

    
253
    """
254
    for _ in range(max_attempts):
255
      result = self.ssh_runner.Run(hostname=hostname, command=command,
256
                                 user=user, use_cluster_key=use_cluster_key,
257
                                 strict_host_check=strict_host_check,
258
                                 private_key=private_key, batch=batch,
259
                                 ask_key=ask_key)
260
      if not result.failed:
261
        break
262

    
263
    return result
264

    
265
  def _CheckRunningInstances(self):
266
    """Checks if on the clusters to be merged there are running instances
267

    
268
    @rtype: boolean
269
    @return: True if there are running instances, False otherwise
270

    
271
    """
272
    for cluster in self.clusters:
273
      result = self._RunCmd(cluster, "gnt-instance list -o status")
274
      if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
275
        return True
276

    
277
    return False
278

    
279
  def _StopMergingInstances(self):
280
    """Stop instances on merging clusters.
281

    
282
    """
283
    for cluster in self.clusters:
284
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
285
                                     " --force-multiple")
286

    
287
      if result.failed:
288
        raise errors.RemoteError("Unable to stop instances on %s."
289
                                 " Fail reason: %s; output: %s" %
290
                                 (cluster, result.fail_reason, result.output))
291

    
292
  def _DisableWatcher(self):
293
    """Disable watch on all merging clusters, including ourself.
294

    
295
    """
296
    for cluster in ["localhost"] + self.clusters:
297
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
298
                                     self.pause_period)
299

    
300
      if result.failed:
301
        raise errors.RemoteError("Unable to pause watcher on %s."
302
                                 " Fail reason: %s; output: %s" %
303
                                 (cluster, result.fail_reason, result.output))
304

    
305
  def _RemoveMasterIps(self):
306
    """Removes the master IPs from the master nodes of each cluster.
307

    
308
    """
309
    for data in self.merger_data:
310
      result = self._RunCmd(data.master_node,
311
                            "gnt-cluster deactivate-master-ip")
312

    
313
      if result.failed:
314
        raise errors.RemoteError("Unable to remove master IP on %s."
315
                                 " Fail reason: %s; output: %s" %
316
                                 (data.master_node,
317
                                  result.fail_reason,
318
                                  result.output))
319

    
320
  def _StopDaemons(self):
321
    """Stop all daemons on merging nodes.
322

    
323
    """
324
    cmd = "%s stop-all" % constants.DAEMON_UTIL
325
    for data in self.merger_data:
326
      for node in data.nodes:
327
        result = self._RunCmd(node, cmd, max_attempts=3)
328

    
329
        if result.failed:
330
          raise errors.RemoteError("Unable to stop daemons on %s."
331
                                   " Fail reason: %s; output: %s." %
332
                                   (node, result.fail_reason, result.output))
333

    
334
  def _FetchRemoteConfig(self):
335
    """Fetches and stores remote cluster config from the master.
336

    
337
    This step is needed before we can merge the config.
338

    
339
    """
340
    for data in self.merger_data:
341
      result = self._RunCmd(data.cluster, "cat %s" %
342
                                          constants.CLUSTER_CONF_FILE)
343

    
344
      if result.failed:
345
        raise errors.RemoteError("Unable to retrieve remote config on %s."
346
                                 " Fail reason: %s; output %s" %
347
                                 (data.cluster, result.fail_reason,
348
                                  result.output))
349

    
350
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
351
                                        data.cluster)
352
      utils.WriteFile(data.config_path, data=result.stdout)
353

    
354
  # R0201: Method could be a function
355
  def _KillMasterDaemon(self): # pylint: disable=R0201
356
    """Kills the local master daemon.
357

    
358
    @raise errors.CommandError: If unable to kill
359

    
360
    """
361
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
362
    if result.failed:
363
      raise errors.CommandError("Unable to stop master daemons."
364
                                " Fail reason: %s; output: %s" %
365
                                (result.fail_reason, result.output))
366

    
367
  def _MergeConfig(self):
368
    """Merges all foreign config into our own config.
369

    
370
    """
371
    my_config = config.ConfigWriter(offline=True)
372
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
373

    
374
    for data in self.merger_data:
375
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
376
      self._MergeClusterConfigs(my_config, other_config)
377
      self._MergeNodeGroups(my_config, other_config)
378

    
379
      for node in other_config.GetNodeList():
380
        node_info = other_config.GetNodeInfo(node)
381
        # Offline the node, it will be reonlined later at node readd
382
        node_info.master_candidate = False
383
        node_info.drained = False
384
        node_info.offline = True
385
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
386
        fake_ec_id += 1
387

    
388
      for instance in other_config.GetInstanceList():
389
        instance_info = other_config.GetInstanceInfo(instance)
390

    
391
        # Update the DRBD port assignments
392
        # This is a little bit hackish
393
        for dsk in instance_info.disks:
394
          if dsk.dev_type in constants.LDS_DRBD:
395
            port = my_config.AllocatePort()
396

    
397
            logical_id = list(dsk.logical_id)
398
            logical_id[2] = port
399
            dsk.logical_id = tuple(logical_id)
400

    
401
            physical_id = list(dsk.physical_id)
402
            physical_id[1] = physical_id[3] = port
403
            dsk.physical_id = tuple(physical_id)
404

    
405
        my_config.AddInstance(instance_info,
406
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
407
        fake_ec_id += 1
408

    
409
  def _MergeClusterConfigs(self, my_config, other_config):
410
    """Checks that all relevant cluster parameters are compatible
411

    
412
    """
413
    my_cluster = my_config.GetClusterInfo()
414
    other_cluster = other_config.GetClusterInfo()
415
    err_count = 0
416

    
417
    #
418
    # Generic checks
419
    #
420
    check_params = [
421
      "beparams",
422
      "default_iallocator",
423
      "drbd_usermode_helper",
424
      "hidden_os",
425
      "maintain_node_health",
426
      "master_netdev",
427
      "ndparams",
428
      "nicparams",
429
      "primary_ip_family",
430
      "tags",
431
      "uid_pool",
432
      ]
433
    check_params_strict = [
434
      "volume_group_name",
435
    ]
436
    if constants.ENABLE_FILE_STORAGE:
437
      check_params_strict.append("file_storage_dir")
438
    if constants.ENABLE_SHARED_FILE_STORAGE:
439
      check_params_strict.append("shared_file_storage_dir")
440
    check_params.extend(check_params_strict)
441

    
442
    if self.params == _PARAMS_STRICT:
443
      params_strict = True
444
    else:
445
      params_strict = False
446

    
447
    for param_name in check_params:
448
      my_param = getattr(my_cluster, param_name)
449
      other_param = getattr(other_cluster, param_name)
450
      if my_param != other_param:
451
        logging.error("The value (%s) of the cluster parameter %s on %s"
452
                      " differs to this cluster's value (%s)",
453
                      other_param, param_name, other_cluster.cluster_name,
454
                      my_param)
455
        if params_strict or param_name in check_params_strict:
456
          err_count += 1
457

    
458
    #
459
    # Custom checks
460
    #
461

    
462
    # Check default hypervisor
463
    my_defhyp = my_cluster.enabled_hypervisors[0]
464
    other_defhyp = other_cluster.enabled_hypervisors[0]
465
    if my_defhyp != other_defhyp:
466
      logging.warning("The default hypervisor (%s) differs on %s, new"
467
                      " instances will be created with this cluster's"
468
                      " default hypervisor (%s)", other_defhyp,
469
                      other_cluster.cluster_name, my_defhyp)
470

    
471
    if (set(my_cluster.enabled_hypervisors) !=
472
        set(other_cluster.enabled_hypervisors)):
473
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
474
                    " this cluster's set (%s)",
475
                    other_cluster.enabled_hypervisors,
476
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
477
      err_count += 1
478

    
479
    # Check hypervisor params for hypervisors we care about
480
    for hyp in my_cluster.enabled_hypervisors:
481
      for param in my_cluster.hvparams[hyp]:
482
        my_value = my_cluster.hvparams[hyp][param]
483
        other_value = other_cluster.hvparams[hyp][param]
484
        if my_value != other_value:
485
          logging.error("The value (%s) of the %s parameter of the %s"
486
                        " hypervisor on %s differs to this cluster's parameter"
487
                        " (%s)",
488
                        other_value, param, hyp, other_cluster.cluster_name,
489
                        my_value)
490
          if params_strict:
491
            err_count += 1
492

    
493
    # Check os hypervisor params for hypervisors we care about
494
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
495
      for hyp in my_cluster.enabled_hypervisors:
496
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
497
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
498
        if my_os_hvp != other_os_hvp:
499
          logging.error("The OS parameters (%s) for the %s OS for the %s"
500
                        " hypervisor on %s differs to this cluster's parameters"
501
                        " (%s)",
502
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
503
                        my_os_hvp)
504
          if params_strict:
505
            err_count += 1
506

    
507
    #
508
    # Warnings
509
    #
510
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
511
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
512
                      " this cluster's value (%s) will take precedence",
513
                      other_cluster.modify_etc_hosts,
514
                      other_cluster.cluster_name,
515
                      my_cluster.modify_etc_hosts)
516

    
517
    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
518
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
519
                      " this cluster's value (%s) will take precedence",
520
                      other_cluster.modify_ssh_setup,
521
                      other_cluster.cluster_name,
522
                      my_cluster.modify_ssh_setup)
523

    
524
    #
525
    # Actual merging
526
    #
527
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
528
                                       other_cluster.reserved_lvs))
529

    
530
    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
531
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
532
                      " cluster's value (%s). The least permissive value (%s)"
533
                      " will be used", other_cluster.prealloc_wipe_disks,
534
                      other_cluster.cluster_name,
535
                      my_cluster.prealloc_wipe_disks, True)
536
      my_cluster.prealloc_wipe_disks = True
537

    
538
    for os_, osparams in other_cluster.osparams.items():
539
      if os_ not in my_cluster.osparams:
540
        my_cluster.osparams[os_] = osparams
541
      elif my_cluster.osparams[os_] != osparams:
542
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
543
                      " this cluster's parameters (%s)",
544
                      osparams, os_, other_cluster.cluster_name,
545
                      my_cluster.osparams[os_])
546
        if params_strict:
547
          err_count += 1
548

    
549
    if err_count:
550
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
551
                                      " values, please fix and re-run" %
552
                                      other_cluster.cluster_name)
553

    
554
  # R0201: Method could be a function
555
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
556
    if os_name in cluster.os_hvp:
557
      return cluster.os_hvp[os_name].get(hyp, None)
558
    else:
559
      return None
560

    
561
  # R0201: Method could be a function
562
  def _MergeNodeGroups(self, my_config, other_config):
563
    """Adds foreign node groups
564

    
565
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
566
    """
567
    # pylint: disable=R0201
568
    logging.info("Node group conflict strategy: %s", self.groups)
569

    
570
    my_grps = my_config.GetAllNodeGroupsInfo().values()
571
    other_grps = other_config.GetAllNodeGroupsInfo().values()
572

    
573
    # Check for node group naming conflicts:
574
    conflicts = []
575
    for other_grp in other_grps:
576
      for my_grp in my_grps:
577
        if other_grp.name == my_grp.name:
578
          conflicts.append(other_grp)
579

    
580
    if conflicts:
581
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
582
      logging.info("Node groups in both local and remote cluster: %s",
583
                   conflict_names)
584

    
585
      # User hasn't specified how to handle conflicts
586
      if not self.groups:
587
        raise errors.CommandError("The following node group(s) are in both"
588
                                  " clusters, and no merge strategy has been"
589
                                  " supplied (see the --groups option): %s" %
590
                                  conflict_names)
591

    
592
      # User wants to rename conflicts
593
      elif self.groups == _GROUPS_RENAME:
594
        for grp in conflicts:
595
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
596
          logging.info("Renaming remote node group from %s to %s"
597
                       " to resolve conflict", grp.name, new_name)
598
          grp.name = new_name
599

    
600
      # User wants to merge conflicting groups
601
      elif self.groups == _GROUPS_MERGE:
602
        for other_grp in conflicts:
603
          logging.info("Merging local and remote '%s' groups", other_grp.name)
604
          for node_name in other_grp.members[:]:
605
            node = other_config.GetNodeInfo(node_name)
606
            # Access to a protected member of a client class
607
            # pylint: disable=W0212
608
            other_config._UnlockedRemoveNodeFromGroup(node)
609

    
610
            # Access to a protected member of a client class
611
            # pylint: disable=W0212
612
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
613

    
614
            # Access to a protected member of a client class
615
            # pylint: disable=W0212
616
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
617
            node.group = my_grp_uuid
618
          # Remove from list of groups to add
619
          other_grps.remove(other_grp)
620

    
621
    for grp in other_grps:
622
      #TODO: handle node group conflicts
623
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
624

    
625
  # R0201: Method could be a function
626
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
627
    """Starts the local master daemon.
628

    
629
    @param no_vote: Should the masterd started without voting? default: False
630
    @raise errors.CommandError: If unable to start daemon.
631

    
632
    """
633
    env = {}
634
    if no_vote:
635
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
636

    
637
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
638
    if result.failed:
639
      raise errors.CommandError("Couldn't start ganeti master."
640
                                " Fail reason: %s; output: %s" %
641
                                (result.fail_reason, result.output))
642

    
643
  def _ReaddMergedNodesAndRedist(self):
644
    """Readds all merging nodes and make sure their config is up-to-date.
645

    
646
    @raise errors.CommandError: If anything fails.
647

    
648
    """
649
    for data in self.merger_data:
650
      for node in data.nodes:
651
        result = utils.RunCmd(["gnt-node", "add", "--readd",
652
                               "--no-ssh-key-check", "--force-join", node])
653
        if result.failed:
654
          logging.error("%s failed to be readded. Reason: %s, output: %s",
655
                         node, result.fail_reason, result.output)
656

    
657
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
658
    if result.failed:
659
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
660
                                " output: %s" % (result.fail_reason,
661
                                                result.output))
662

    
663
  # R0201: Method could be a function
664
  def _StartupAllInstances(self): # pylint: disable=R0201
665
    """Starts up all instances (locally).
666

    
667
    @raise errors.CommandError: If unable to start clusters
668

    
669
    """
670
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
671
                           "--force-multiple"])
672
    if result.failed:
673
      raise errors.CommandError("Unable to start all instances."
674
                                " Fail reason: %s; output: %s" %
675
                                (result.fail_reason, result.output))
676

    
677
  # R0201: Method could be a function
678
  # TODO: make this overridable, for some verify errors
679
  def _VerifyCluster(self): # pylint: disable=R0201
680
    """Runs gnt-cluster verify to verify the health.
681

    
682
    @raise errors.ProgrammError: If cluster fails on verification
683

    
684
    """
685
    result = utils.RunCmd(["gnt-cluster", "verify"])
686
    if result.failed:
687
      raise errors.CommandError("Verification of cluster failed."
688
                                " Fail reason: %s; output: %s" %
689
                                (result.fail_reason, result.output))
690

    
691
  def Merge(self):
692
    """Does the actual merge.
693

    
694
    It runs all the steps in the right order and updates the user about steps
695
    taken. Also it keeps track of rollback_steps to undo everything.
696

    
697
    """
698
    rbsteps = []
699
    try:
700
      logging.info("Pre cluster verification")
701
      self._VerifyCluster()
702

    
703
      logging.info("Prepare authorized_keys")
704
      rbsteps.append("Remove our key from authorized_keys on nodes:"
705
                     " %(nodes)s")
706
      self._PrepareAuthorizedKeys()
707

    
708
      rbsteps.append("Start all instances again on the merging"
709
                     " clusters: %(clusters)s")
710
      if self.stop_instances:
711
        logging.info("Stopping merging instances (takes a while)")
712
        self._StopMergingInstances()
713
      logging.info("Checking that no instances are running on the mergees")
714
      instances_running = self._CheckRunningInstances()
715
      if instances_running:
716
        raise errors.CommandError("Some instances are still running on the"
717
                                  " mergees")
718
      logging.info("Disable watcher")
719
      self._DisableWatcher()
720
      logging.info("Merging config")
721
      self._FetchRemoteConfig()
722
      logging.info("Removing master IPs on mergee master nodes")
723
      self._RemoveMasterIps()
724
      logging.info("Stop daemons on merging nodes")
725
      self._StopDaemons()
726

    
727
      logging.info("Stopping master daemon")
728
      self._KillMasterDaemon()
729

    
730
      rbsteps.append("Restore %s from another master candidate"
731
                     " and restart master daemon" %
732
                     constants.CLUSTER_CONF_FILE)
733
      self._MergeConfig()
734
      self._StartMasterDaemon(no_vote=True)
735

    
736
      # Point of no return, delete rbsteps
737
      del rbsteps[:]
738

    
739
      logging.warning("We are at the point of no return. Merge can not easily"
740
                      " be undone after this point.")
741
      logging.info("Readd nodes")
742
      self._ReaddMergedNodesAndRedist()
743

    
744
      logging.info("Merge done, restart master daemon normally")
745
      self._KillMasterDaemon()
746
      self._StartMasterDaemon()
747

    
748
      if self.restart == _RESTART_ALL:
749
        logging.info("Starting instances again")
750
        self._StartupAllInstances()
751
      else:
752
        logging.info("Not starting instances again")
753
      logging.info("Post cluster verification")
754
      self._VerifyCluster()
755
    except errors.GenericError, e:
756
      logging.exception(e)
757

    
758
      if rbsteps:
759
        nodes = Flatten([data.nodes for data in self.merger_data])
760
        info = {
761
          "clusters": self.clusters,
762
          "nodes": nodes,
763
          }
764
        logging.critical("In order to rollback do the following:")
765
        for step in rbsteps:
766
          logging.critical("  * %s", step % info)
767
      else:
768
        logging.critical("Nothing to rollback.")
769

    
770
      # TODO: Keep track of steps done for a flawless resume?
771

    
772
  def Cleanup(self):
773
    """Clean up our environment.
774

    
775
    This cleans up remote private keys and configs and after that
776
    deletes the temporary directory.
777

    
778
    """
779
    shutil.rmtree(self.work_dir)
780

    
781

    
782
def SetupLogging(options):
783
  """Setting up logging infrastructure.
784

    
785
  @param options: Parsed command line options
786

    
787
  """
788
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
789

    
790
  stderr_handler = logging.StreamHandler()
791
  stderr_handler.setFormatter(formatter)
792
  if options.debug:
793
    stderr_handler.setLevel(logging.NOTSET)
794
  elif options.verbose:
795
    stderr_handler.setLevel(logging.INFO)
796
  else:
797
    stderr_handler.setLevel(logging.WARNING)
798

    
799
  root_logger = logging.getLogger("")
800
  root_logger.setLevel(logging.NOTSET)
801
  root_logger.addHandler(stderr_handler)
802

    
803

    
804
def main():
805
  """Main routine.
806

    
807
  """
808
  program = os.path.basename(sys.argv[0])
809

    
810
  parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
811
                                 prog=program)
812
  parser.add_option(cli.DEBUG_OPT)
813
  parser.add_option(cli.VERBOSE_OPT)
814
  parser.add_option(PAUSE_PERIOD_OPT)
815
  parser.add_option(GROUPS_OPT)
816
  parser.add_option(RESTART_OPT)
817
  parser.add_option(PARAMS_OPT)
818
  parser.add_option(SKIP_STOP_INSTANCES_OPT)
819

    
820
  (options, args) = parser.parse_args()
821

    
822
  SetupLogging(options)
823

    
824
  if not args:
825
    parser.error("No clusters specified")
826

    
827
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
828
                          options.groups, options.restart, options.params,
829
                          options.stop_instances)
830
  try:
831
    try:
832
      cluster_merger.Setup()
833
      cluster_merger.Merge()
834
    except errors.GenericError, e:
835
      logging.exception(e)
836
      return constants.EXIT_FAILURE
837
  finally:
838
    cluster_merger.Cleanup()
839

    
840
  return constants.EXIT_SUCCESS
841

    
842

    
843
if __name__ == "__main__":
844
  sys.exit(main())