Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ 224ff0f7

History | View | Annotate | Download (30.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43
from ganeti import pathutils
44

    
45

    
46
_GROUPS_MERGE = "merge"
47
_GROUPS_RENAME = "rename"
48
_CLUSTERMERGE_ECID = "clustermerge-ecid"
49
_RESTART_ALL = "all"
50
_RESTART_UP = "up"
51
_RESTART_NONE = "none"
52
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
53
_PARAMS_STRICT = "strict"
54
_PARAMS_WARN = "warn"
55
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
56

    
57

    
58
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
59
                                  action="store", type="int",
60
                                  dest="pause_period",
61
                                  help=("Amount of time in seconds watcher"
62
                                        " should be suspended from running"))
63
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
64
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
65
                            dest="groups",
66
                            help=("How to handle groups that have the"
67
                                  " same name (One of: %s/%s)" %
68
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
69
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
70
                            metavar="STRATEGY",
71
                            choices=_PARAMS_CHOICES,
72
                            dest="params",
73
                            help=("How to handle params that have"
74
                                  " different values (One of: %s/%s)" %
75
                                  _PARAMS_CHOICES))
76

    
77
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
78
                             metavar="STRATEGY",
79
                             choices=_RESTART_CHOICES,
80
                             dest="restart",
81
                             help=("How to handle restarting instances"
82
                                   " same name (One of: %s/%s/%s)" %
83
                                   _RESTART_CHOICES))
84

    
85
SKIP_STOP_INSTANCES_OPT = \
86
  cli.cli_option("--skip-stop-instances", default=True, action="store_false",
87
                 dest="stop_instances",
88
                 help=("Don't stop the instances on the clusters, just check "
89
                       "that none is running"))
90

    
91

    
92
def Flatten(unflattened_list):
93
  """Flattens a list.
94

    
95
  @param unflattened_list: A list of unflattened list objects.
96
  @return: A flattened list
97

    
98
  """
99
  flattened_list = []
100

    
101
  for item in unflattened_list:
102
    if isinstance(item, list):
103
      flattened_list.extend(Flatten(item))
104
    else:
105
      flattened_list.append(item)
106
  return flattened_list
107

    
108

    
109
class MergerData(object):
110
  """Container class to hold data used for merger.
111

    
112
  """
113
  def __init__(self, cluster, key_path, nodes, instances, master_node,
114
               config_path=None):
115
    """Initialize the container.
116

    
117
    @param cluster: The name of the cluster
118
    @param key_path: Path to the ssh private key used for authentication
119
    @param nodes: List of online nodes in the merging cluster
120
    @param instances: List of instances running on merging cluster
121
    @param master_node: Name of the master node
122
    @param config_path: Path to the merging cluster config
123

    
124
    """
125
    self.cluster = cluster
126
    self.key_path = key_path
127
    self.nodes = nodes
128
    self.instances = instances
129
    self.master_node = master_node
130
    self.config_path = config_path
131

    
132

    
133
class Merger(object):
134
  """Handling the merge.
135

    
136
  """
137
  RUNNING_STATUSES = frozenset([
138
    constants.INSTST_RUNNING,
139
    constants.INSTST_ERRORUP,
140
    ])
141

    
142
  def __init__(self, clusters, pause_period, groups, restart, params,
143
               stop_instances):
144
    """Initialize object with sane defaults and infos required.
145

    
146
    @param clusters: The list of clusters to merge in
147
    @param pause_period: The time watcher shall be disabled for
148
    @param groups: How to handle group conflicts
149
    @param restart: How to handle instance restart
150
    @param stop_instances: Indicates whether the instances must be stopped
151
                           (True) or if the Merger must only check if no
152
                           instances are running on the mergee clusters (False)
153

    
154
    """
155
    self.merger_data = []
156
    self.clusters = clusters
157
    self.pause_period = pause_period
158
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
159
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
160
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
161
    self.groups = groups
162
    self.restart = restart
163
    self.params = params
164
    self.stop_instances = stop_instances
165
    if self.restart == _RESTART_UP:
166
      raise NotImplementedError
167

    
168
  def Setup(self):
169
    """Sets up our end so we can do the merger.
170

    
171
    This method is setting us up as a preparation for the merger.
172
    It makes the initial contact and gathers information needed.
173

    
174
    @raise errors.RemoteError: for errors in communication/grabbing
175

    
176
    """
177
    (remote_path, _, _) = ssh.GetUserFiles("root")
178

    
179
    if self.cluster_name in self.clusters:
180
      raise errors.CommandError("Cannot merge cluster %s with itself" %
181
                                self.cluster_name)
182

    
183
    # Fetch remotes private key
184
    for cluster in self.clusters:
185
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
186
                            ask_key=False)
187
      if result.failed:
188
        raise errors.RemoteError("There was an error while grabbing ssh private"
189
                                 " key from %s. Fail reason: %s; output: %s" %
190
                                 (cluster, result.fail_reason, result.output))
191

    
192
      key_path = utils.PathJoin(self.work_dir, cluster)
193
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
194

    
195
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
196
                            " --no-headers --separator=,", private_key=key_path)
197
      if result.failed:
198
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
199
                                 " Fail reason: %s; output: %s" %
200
                                 (cluster, result.fail_reason, result.output))
201
      nodes_statuses = [line.split(",") for line in result.stdout.splitlines()]
202
      nodes = [node_status[0] for node_status in nodes_statuses
203
               if node_status[1] == "N"]
204

    
205
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-headers",
206
                            private_key=key_path)
207
      if result.failed:
208
        raise errors.RemoteError("Unable to retrieve list of instances from"
209
                                 " %s. Fail reason: %s; output: %s" %
210
                                 (cluster, result.fail_reason, result.output))
211
      instances = result.stdout.splitlines()
212

    
213
      path = utils.PathJoin(pathutils.DATA_DIR, "ssconf_%s" %
214
                            constants.SS_MASTER_NODE)
215
      result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
216
      if result.failed:
217
        raise errors.RemoteError("Unable to retrieve the master node name from"
218
                                 " %s. Fail reason: %s; output: %s" %
219
                                 (cluster, result.fail_reason, result.output))
220
      master_node = result.stdout.strip()
221

    
222
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
223
                                         master_node))
224

    
225
  def _PrepareAuthorizedKeys(self):
226
    """Prepare the authorized_keys on every merging node.
227

    
228
    This method add our public key to remotes authorized_key for further
229
    communication.
230

    
231
    """
232
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
233
    pub_key = utils.ReadFile(pub_key_file)
234

    
235
    for data in self.merger_data:
236
      for node in data.nodes:
237
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
238
                                     (auth_keys, pub_key)),
239
                              private_key=data.key_path, max_attempts=3)
240

    
241
        if result.failed:
242
          raise errors.RemoteError("Unable to add our public key to %s in %s."
243
                                   " Fail reason: %s; output: %s" %
244
                                   (node, data.cluster, result.fail_reason,
245
                                    result.output))
246

    
247
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
248
              strict_host_check=False, private_key=None, batch=True,
249
              ask_key=False, max_attempts=1):
250
    """Wrapping SshRunner.Run with default parameters.
251

    
252
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
253

    
254
    """
255
    for _ in range(max_attempts):
256
      result = self.ssh_runner.Run(hostname=hostname, command=command,
257
                                   user=user, use_cluster_key=use_cluster_key,
258
                                   strict_host_check=strict_host_check,
259
                                   private_key=private_key, batch=batch,
260
                                   ask_key=ask_key)
261
      if not result.failed:
262
        break
263

    
264
    return result
265

    
266
  def _CheckRunningInstances(self):
267
    """Checks if on the clusters to be merged there are running instances
268

    
269
    @rtype: boolean
270
    @return: True if there are running instances, False otherwise
271

    
272
    """
273
    for cluster in self.clusters:
274
      result = self._RunCmd(cluster, "gnt-instance list -o status")
275
      if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
276
        return True
277

    
278
    return False
279

    
280
  def _StopMergingInstances(self):
281
    """Stop instances on merging clusters.
282

    
283
    """
284
    for cluster in self.clusters:
285
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
286
                                     " --force-multiple")
287

    
288
      if result.failed:
289
        raise errors.RemoteError("Unable to stop instances on %s."
290
                                 " Fail reason: %s; output: %s" %
291
                                 (cluster, result.fail_reason, result.output))
292

    
293
  def _DisableWatcher(self):
294
    """Disable watch on all merging clusters, including ourself.
295

    
296
    """
297
    for cluster in ["localhost"] + self.clusters:
298
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
299
                                     self.pause_period)
300

    
301
      if result.failed:
302
        raise errors.RemoteError("Unable to pause watcher on %s."
303
                                 " Fail reason: %s; output: %s" %
304
                                 (cluster, result.fail_reason, result.output))
305

    
306
  def _RemoveMasterIps(self):
307
    """Removes the master IPs from the master nodes of each cluster.
308

    
309
    """
310
    for data in self.merger_data:
311
      result = self._RunCmd(data.master_node,
312
                            "gnt-cluster deactivate-master-ip --yes")
313

    
314
      if result.failed:
315
        raise errors.RemoteError("Unable to remove master IP on %s."
316
                                 " Fail reason: %s; output: %s" %
317
                                 (data.master_node,
318
                                  result.fail_reason,
319
                                  result.output))
320

    
321
  def _StopDaemons(self):
322
    """Stop all daemons on merging nodes.
323

    
324
    """
325
    cmd = "%s stop-all" % pathutils.DAEMON_UTIL
326
    for data in self.merger_data:
327
      for node in data.nodes:
328
        result = self._RunCmd(node, cmd, max_attempts=3)
329

    
330
        if result.failed:
331
          raise errors.RemoteError("Unable to stop daemons on %s."
332
                                   " Fail reason: %s; output: %s." %
333
                                   (node, result.fail_reason, result.output))
334

    
335
  def _FetchRemoteConfig(self):
336
    """Fetches and stores remote cluster config from the master.
337

    
338
    This step is needed before we can merge the config.
339

    
340
    """
341
    for data in self.merger_data:
342
      result = self._RunCmd(data.cluster, "cat %s" %
343
                                          pathutils.CLUSTER_CONF_FILE)
344

    
345
      if result.failed:
346
        raise errors.RemoteError("Unable to retrieve remote config on %s."
347
                                 " Fail reason: %s; output %s" %
348
                                 (data.cluster, result.fail_reason,
349
                                  result.output))
350

    
351
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
352
                                        data.cluster)
353
      utils.WriteFile(data.config_path, data=result.stdout)
354

    
355
  # R0201: Method could be a function
356
  def _KillMasterDaemon(self): # pylint: disable=R0201
357
    """Kills the local master daemon.
358

    
359
    @raise errors.CommandError: If unable to kill
360

    
361
    """
362
    result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
363
    if result.failed:
364
      raise errors.CommandError("Unable to stop master daemons."
365
                                " Fail reason: %s; output: %s" %
366
                                (result.fail_reason, result.output))
367

    
368
  def _MergeConfig(self):
369
    """Merges all foreign config into our own config.
370

    
371
    """
372
    my_config = config.ConfigWriter(offline=True)
373
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
374

    
375
    for data in self.merger_data:
376
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
377
      self._MergeClusterConfigs(my_config, other_config)
378
      self._MergeNodeGroups(my_config, other_config)
379

    
380
      for node in other_config.GetNodeList():
381
        node_info = other_config.GetNodeInfo(node)
382
        # Offline the node, it will be reonlined later at node readd
383
        node_info.master_candidate = False
384
        node_info.drained = False
385
        node_info.offline = True
386
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
387
        fake_ec_id += 1
388

    
389
      for instance in other_config.GetInstanceList():
390
        instance_info = other_config.GetInstanceInfo(instance)
391

    
392
        # Update the DRBD port assignments
393
        # This is a little bit hackish
394
        for dsk in instance_info.disks:
395
          if dsk.dev_type in constants.LDS_DRBD:
396
            port = my_config.AllocatePort()
397

    
398
            logical_id = list(dsk.logical_id)
399
            logical_id[2] = port
400
            dsk.logical_id = tuple(logical_id)
401

    
402
            physical_id = list(dsk.physical_id)
403
            physical_id[1] = physical_id[3] = port
404
            dsk.physical_id = tuple(physical_id)
405

    
406
        my_config.AddInstance(instance_info,
407
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
408
        fake_ec_id += 1
409

    
410
  def _MergeClusterConfigs(self, my_config, other_config):
411
    """Checks that all relevant cluster parameters are compatible
412

    
413
    """
414
    my_cluster = my_config.GetClusterInfo()
415
    other_cluster = other_config.GetClusterInfo()
416
    err_count = 0
417

    
418
    #
419
    # Generic checks
420
    #
421
    check_params = [
422
      "beparams",
423
      "default_iallocator",
424
      "drbd_usermode_helper",
425
      "hidden_os",
426
      "maintain_node_health",
427
      "master_netdev",
428
      "ndparams",
429
      "nicparams",
430
      "primary_ip_family",
431
      "tags",
432
      "uid_pool",
433
      ]
434
    check_params_strict = [
435
      "volume_group_name",
436
    ]
437
    if constants.ENABLE_FILE_STORAGE:
438
      check_params_strict.append("file_storage_dir")
439
    if constants.ENABLE_SHARED_FILE_STORAGE:
440
      check_params_strict.append("shared_file_storage_dir")
441
    check_params.extend(check_params_strict)
442

    
443
    if self.params == _PARAMS_STRICT:
444
      params_strict = True
445
    else:
446
      params_strict = False
447

    
448
    for param_name in check_params:
449
      my_param = getattr(my_cluster, param_name)
450
      other_param = getattr(other_cluster, param_name)
451
      if my_param != other_param:
452
        logging.error("The value (%s) of the cluster parameter %s on %s"
453
                      " differs to this cluster's value (%s)",
454
                      other_param, param_name, other_cluster.cluster_name,
455
                      my_param)
456
        if params_strict or param_name in check_params_strict:
457
          err_count += 1
458

    
459
    #
460
    # Custom checks
461
    #
462

    
463
    # Check default hypervisor
464
    my_defhyp = my_cluster.enabled_hypervisors[0]
465
    other_defhyp = other_cluster.enabled_hypervisors[0]
466
    if my_defhyp != other_defhyp:
467
      logging.warning("The default hypervisor (%s) differs on %s, new"
468
                      " instances will be created with this cluster's"
469
                      " default hypervisor (%s)", other_defhyp,
470
                      other_cluster.cluster_name, my_defhyp)
471

    
472
    if (set(my_cluster.enabled_hypervisors) !=
473
        set(other_cluster.enabled_hypervisors)):
474
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
475
                    " this cluster's set (%s)",
476
                    other_cluster.enabled_hypervisors,
477
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
478
      err_count += 1
479

    
480
    # Check hypervisor params for hypervisors we care about
481
    for hyp in my_cluster.enabled_hypervisors:
482
      for param in my_cluster.hvparams[hyp]:
483
        my_value = my_cluster.hvparams[hyp][param]
484
        other_value = other_cluster.hvparams[hyp][param]
485
        if my_value != other_value:
486
          logging.error("The value (%s) of the %s parameter of the %s"
487
                        " hypervisor on %s differs to this cluster's parameter"
488
                        " (%s)",
489
                        other_value, param, hyp, other_cluster.cluster_name,
490
                        my_value)
491
          if params_strict:
492
            err_count += 1
493

    
494
    # Check os hypervisor params for hypervisors we care about
495
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
496
      for hyp in my_cluster.enabled_hypervisors:
497
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
498
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
499
        if my_os_hvp != other_os_hvp:
500
          logging.error("The OS parameters (%s) for the %s OS for the %s"
501
                        " hypervisor on %s differs to this cluster's parameters"
502
                        " (%s)",
503
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
504
                        my_os_hvp)
505
          if params_strict:
506
            err_count += 1
507

    
508
    #
509
    # Warnings
510
    #
511
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
512
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
513
                      " this cluster's value (%s) will take precedence",
514
                      other_cluster.modify_etc_hosts,
515
                      other_cluster.cluster_name,
516
                      my_cluster.modify_etc_hosts)
517

    
518
    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
519
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
520
                      " this cluster's value (%s) will take precedence",
521
                      other_cluster.modify_ssh_setup,
522
                      other_cluster.cluster_name,
523
                      my_cluster.modify_ssh_setup)
524

    
525
    #
526
    # Actual merging
527
    #
528
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
529
                                       other_cluster.reserved_lvs))
530

    
531
    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
532
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
533
                      " cluster's value (%s). The least permissive value (%s)"
534
                      " will be used", other_cluster.prealloc_wipe_disks,
535
                      other_cluster.cluster_name,
536
                      my_cluster.prealloc_wipe_disks, True)
537
      my_cluster.prealloc_wipe_disks = True
538

    
539
    for os_, osparams in other_cluster.osparams.items():
540
      if os_ not in my_cluster.osparams:
541
        my_cluster.osparams[os_] = osparams
542
      elif my_cluster.osparams[os_] != osparams:
543
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
544
                      " this cluster's parameters (%s)",
545
                      osparams, os_, other_cluster.cluster_name,
546
                      my_cluster.osparams[os_])
547
        if params_strict:
548
          err_count += 1
549

    
550
    if err_count:
551
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
552
                                      " values, please fix and re-run" %
553
                                      other_cluster.cluster_name)
554

    
555
  # R0201: Method could be a function
556
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
557
    if os_name in cluster.os_hvp:
558
      return cluster.os_hvp[os_name].get(hyp, None)
559
    else:
560
      return None
561

    
562
  # R0201: Method could be a function
563
  def _MergeNodeGroups(self, my_config, other_config):
564
    """Adds foreign node groups
565

    
566
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
567
    """
568
    # pylint: disable=R0201
569
    logging.info("Node group conflict strategy: %s", self.groups)
570

    
571
    my_grps = my_config.GetAllNodeGroupsInfo().values()
572
    other_grps = other_config.GetAllNodeGroupsInfo().values()
573

    
574
    # Check for node group naming conflicts:
575
    conflicts = []
576
    for other_grp in other_grps:
577
      for my_grp in my_grps:
578
        if other_grp.name == my_grp.name:
579
          conflicts.append(other_grp)
580

    
581
    if conflicts:
582
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
583
      logging.info("Node groups in both local and remote cluster: %s",
584
                   conflict_names)
585

    
586
      # User hasn't specified how to handle conflicts
587
      if not self.groups:
588
        raise errors.CommandError("The following node group(s) are in both"
589
                                  " clusters, and no merge strategy has been"
590
                                  " supplied (see the --groups option): %s" %
591
                                  conflict_names)
592

    
593
      # User wants to rename conflicts
594
      elif self.groups == _GROUPS_RENAME:
595
        for grp in conflicts:
596
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
597
          logging.info("Renaming remote node group from %s to %s"
598
                       " to resolve conflict", grp.name, new_name)
599
          grp.name = new_name
600

    
601
      # User wants to merge conflicting groups
602
      elif self.groups == _GROUPS_MERGE:
603
        for other_grp in conflicts:
604
          logging.info("Merging local and remote '%s' groups", other_grp.name)
605
          for node_name in other_grp.members[:]:
606
            node = other_config.GetNodeInfo(node_name)
607
            # Access to a protected member of a client class
608
            # pylint: disable=W0212
609
            other_config._UnlockedRemoveNodeFromGroup(node)
610

    
611
            # Access to a protected member of a client class
612
            # pylint: disable=W0212
613
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
614

    
615
            # Access to a protected member of a client class
616
            # pylint: disable=W0212
617
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
618
            node.group = my_grp_uuid
619
          # Remove from list of groups to add
620
          other_grps.remove(other_grp)
621

    
622
    for grp in other_grps:
623
      #TODO: handle node group conflicts
624
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
625

    
626
  # R0201: Method could be a function
627
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
628
    """Starts the local master daemon.
629

    
630
    @param no_vote: Should the masterd started without voting? default: False
631
    @raise errors.CommandError: If unable to start daemon.
632

    
633
    """
634
    env = {}
635
    if no_vote:
636
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
637

    
638
    result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
639
    if result.failed:
640
      raise errors.CommandError("Couldn't start ganeti master."
641
                                " Fail reason: %s; output: %s" %
642
                                (result.fail_reason, result.output))
643

    
644
  def _ReaddMergedNodesAndRedist(self):
645
    """Readds all merging nodes and make sure their config is up-to-date.
646

    
647
    @raise errors.CommandError: If anything fails.
648

    
649
    """
650
    for data in self.merger_data:
651
      for node in data.nodes:
652
        logging.info("Readding node %s", node)
653
        result = utils.RunCmd(["gnt-node", "add", "--readd",
654
                               "--no-ssh-key-check", node])
655
        if result.failed:
656
          logging.error("%s failed to be readded. Reason: %s, output: %s",
657
                         node, result.fail_reason, result.output)
658

    
659
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
660
    if result.failed:
661
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
662
                                " output: %s" % (result.fail_reason,
663
                                                 result.output))
664

    
665
  # R0201: Method could be a function
666
  def _StartupAllInstances(self): # pylint: disable=R0201
667
    """Starts up all instances (locally).
668

    
669
    @raise errors.CommandError: If unable to start clusters
670

    
671
    """
672
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
673
                           "--force-multiple"])
674
    if result.failed:
675
      raise errors.CommandError("Unable to start all instances."
676
                                " Fail reason: %s; output: %s" %
677
                                (result.fail_reason, result.output))
678

    
679
  # R0201: Method could be a function
680
  # TODO: make this overridable, for some verify errors
681
  def _VerifyCluster(self): # pylint: disable=R0201
682
    """Runs gnt-cluster verify to verify the health.
683

    
684
    @raise errors.ProgrammError: If cluster fails on verification
685

    
686
    """
687
    result = utils.RunCmd(["gnt-cluster", "verify"])
688
    if result.failed:
689
      raise errors.CommandError("Verification of cluster failed."
690
                                " Fail reason: %s; output: %s" %
691
                                (result.fail_reason, result.output))
692

    
693
  def Merge(self):
694
    """Does the actual merge.
695

    
696
    It runs all the steps in the right order and updates the user about steps
697
    taken. Also it keeps track of rollback_steps to undo everything.
698

    
699
    """
700
    rbsteps = []
701
    try:
702
      logging.info("Pre cluster verification")
703
      self._VerifyCluster()
704

    
705
      logging.info("Prepare authorized_keys")
706
      rbsteps.append("Remove our key from authorized_keys on nodes:"
707
                     " %(nodes)s")
708
      self._PrepareAuthorizedKeys()
709

    
710
      rbsteps.append("Start all instances again on the merging"
711
                     " clusters: %(clusters)s")
712
      if self.stop_instances:
713
        logging.info("Stopping merging instances (takes a while)")
714
        self._StopMergingInstances()
715
      logging.info("Checking that no instances are running on the mergees")
716
      instances_running = self._CheckRunningInstances()
717
      if instances_running:
718
        raise errors.CommandError("Some instances are still running on the"
719
                                  " mergees")
720
      logging.info("Disable watcher")
721
      self._DisableWatcher()
722
      logging.info("Merging config")
723
      self._FetchRemoteConfig()
724
      logging.info("Removing master IPs on mergee master nodes")
725
      self._RemoveMasterIps()
726
      logging.info("Stop daemons on merging nodes")
727
      self._StopDaemons()
728

    
729
      logging.info("Stopping master daemon")
730
      self._KillMasterDaemon()
731

    
732
      rbsteps.append("Restore %s from another master candidate"
733
                     " and restart master daemon" %
734
                     pathutils.CLUSTER_CONF_FILE)
735
      self._MergeConfig()
736
      self._StartMasterDaemon(no_vote=True)
737

    
738
      # Point of no return, delete rbsteps
739
      del rbsteps[:]
740

    
741
      logging.warning("We are at the point of no return. Merge can not easily"
742
                      " be undone after this point.")
743
      logging.info("Readd nodes")
744
      self._ReaddMergedNodesAndRedist()
745

    
746
      logging.info("Merge done, restart master daemon normally")
747
      self._KillMasterDaemon()
748
      self._StartMasterDaemon()
749

    
750
      if self.restart == _RESTART_ALL:
751
        logging.info("Starting instances again")
752
        self._StartupAllInstances()
753
      else:
754
        logging.info("Not starting instances again")
755
      logging.info("Post cluster verification")
756
      self._VerifyCluster()
757
    except errors.GenericError, e:
758
      logging.exception(e)
759

    
760
      if rbsteps:
761
        nodes = Flatten([data.nodes for data in self.merger_data])
762
        info = {
763
          "clusters": self.clusters,
764
          "nodes": nodes,
765
          }
766
        logging.critical("In order to rollback do the following:")
767
        for step in rbsteps:
768
          logging.critical("  * %s", step % info)
769
      else:
770
        logging.critical("Nothing to rollback.")
771

    
772
      # TODO: Keep track of steps done for a flawless resume?
773

    
774
  def Cleanup(self):
775
    """Clean up our environment.
776

    
777
    This cleans up remote private keys and configs and after that
778
    deletes the temporary directory.
779

    
780
    """
781
    shutil.rmtree(self.work_dir)
782

    
783

    
784
def SetupLogging(options):
785
  """Setting up logging infrastructure.
786

    
787
  @param options: Parsed command line options
788

    
789
  """
790
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
791

    
792
  stderr_handler = logging.StreamHandler()
793
  stderr_handler.setFormatter(formatter)
794
  if options.debug:
795
    stderr_handler.setLevel(logging.NOTSET)
796
  elif options.verbose:
797
    stderr_handler.setLevel(logging.INFO)
798
  else:
799
    stderr_handler.setLevel(logging.WARNING)
800

    
801
  root_logger = logging.getLogger("")
802
  root_logger.setLevel(logging.NOTSET)
803
  root_logger.addHandler(stderr_handler)
804

    
805

    
806
def main():
807
  """Main routine.
808

    
809
  """
810
  program = os.path.basename(sys.argv[0])
811

    
812
  parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
813
                                 prog=program)
814
  parser.add_option(cli.DEBUG_OPT)
815
  parser.add_option(cli.VERBOSE_OPT)
816
  parser.add_option(PAUSE_PERIOD_OPT)
817
  parser.add_option(GROUPS_OPT)
818
  parser.add_option(RESTART_OPT)
819
  parser.add_option(PARAMS_OPT)
820
  parser.add_option(SKIP_STOP_INSTANCES_OPT)
821

    
822
  (options, args) = parser.parse_args()
823

    
824
  SetupLogging(options)
825

    
826
  if not args:
827
    parser.error("No clusters specified")
828

    
829
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
830
                          options.groups, options.restart, options.params,
831
                          options.stop_instances)
832
  try:
833
    try:
834
      cluster_merger.Setup()
835
      cluster_merger.Merge()
836
    except errors.GenericError, e:
837
      logging.exception(e)
838
      return constants.EXIT_FAILURE
839
  finally:
840
    cluster_merger.Cleanup()
841

    
842
  return constants.EXIT_SUCCESS
843

    
844

    
845
if __name__ == "__main__":
846
  sys.exit(main())