Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ b459a848

History | View | Annotate | Download (31.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43
from ganeti import netutils
44

    
45

    
46
_GROUPS_MERGE = "merge"
47
_GROUPS_RENAME = "rename"
48
_CLUSTERMERGE_ECID = "clustermerge-ecid"
49
_RESTART_ALL = "all"
50
_RESTART_UP = "up"
51
_RESTART_NONE = "none"
52
_RESTART_CHOICES = (_RESTART_ALL, _RESTART_UP, _RESTART_NONE)
53
_PARAMS_STRICT = "strict"
54
_PARAMS_WARN = "warn"
55
_PARAMS_CHOICES = (_PARAMS_STRICT, _PARAMS_WARN)
56

    
57

    
58
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
59
                                  action="store", type="int",
60
                                  dest="pause_period",
61
                                  help=("Amount of time in seconds watcher"
62
                                        " should be suspended from running"))
63
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
64
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
65
                            dest="groups",
66
                            help=("How to handle groups that have the"
67
                                  " same name (One of: %s/%s)" %
68
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
69
PARAMS_OPT = cli.cli_option("--parameter-conflicts", default=_PARAMS_STRICT,
70
                            metavar="STRATEGY",
71
                            choices=_PARAMS_CHOICES,
72
                            dest="params",
73
                            help=("How to handle params that have"
74
                                  " different values (One of: %s/%s)" %
75
                                  _PARAMS_CHOICES))
76

    
77
RESTART_OPT = cli.cli_option("--restart", default=_RESTART_ALL,
78
                             metavar="STRATEGY",
79
                             choices=_RESTART_CHOICES,
80
                             dest="restart",
81
                             help=("How to handle restarting instances"
82
                                   " same name (One of: %s/%s/%s)" %
83
                                   _RESTART_CHOICES))
84

    
85
SKIP_STOP_INSTANCES_OPT = \
86
  cli.cli_option("--skip-stop-instances", default=True, action="store_false",
87
                 dest="stop_instances",
88
                 help=("Don't stop the instances on the clusters, just check "
89
                       "that none is running"))
90

    
91

    
92
def Flatten(unflattened_list):
93
  """Flattens a list.
94

    
95
  @param unflattened_list: A list of unflattened list objects.
96
  @return: A flattened list
97

    
98
  """
99
  flattened_list = []
100

    
101
  for item in unflattened_list:
102
    if isinstance(item, list):
103
      flattened_list.extend(Flatten(item))
104
    else:
105
      flattened_list.append(item)
106
  return flattened_list
107

    
108

    
109
class MergerData(object):
110
  """Container class to hold data used for merger.
111

    
112
  """
113
  def __init__(self, cluster, key_path, nodes, instances, master_node,
114
               master_ip, config_path=None):
115
    """Initialize the container.
116

    
117
    @param cluster: The name of the cluster
118
    @param key_path: Path to the ssh private key used for authentication
119
    @param nodes: List of online nodes in the merging cluster
120
    @param instances: List of instances running on merging cluster
121
    @param master_node: Name of the master node
122
    @param master_ip: Cluster IP
123
    @param config_path: Path to the merging cluster config
124

    
125
    """
126
    self.cluster = cluster
127
    self.key_path = key_path
128
    self.nodes = nodes
129
    self.instances = instances
130
    self.master_node = master_node
131
    self.master_ip = master_ip
132
    self.config_path = config_path
133

    
134

    
135
class Merger(object):
136
  """Handling the merge.
137

    
138
  """
139
  RUNNING_STATUSES = frozenset([
140
    constants.INSTST_RUNNING,
141
    constants.INSTST_ERRORUP,
142
    ])
143

    
144
  def __init__(self, clusters, pause_period, groups, restart, params,
145
               stop_instances):
146
    """Initialize object with sane defaults and infos required.
147

    
148
    @param clusters: The list of clusters to merge in
149
    @param pause_period: The time watcher shall be disabled for
150
    @param groups: How to handle group conflicts
151
    @param restart: How to handle instance restart
152
    @param stop_instances: Indicates whether the instances must be stopped
153
                           (True) or if the Merger must only check if no
154
                           instances are running on the mergee clusters (False)
155

    
156
    """
157
    self.merger_data = []
158
    self.clusters = clusters
159
    self.pause_period = pause_period
160
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
161
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
162
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
163
    self.groups = groups
164
    self.restart = restart
165
    self.params = params
166
    self.stop_instances = stop_instances
167
    if self.restart == _RESTART_UP:
168
      raise NotImplementedError
169

    
170
  def Setup(self):
171
    """Sets up our end so we can do the merger.
172

    
173
    This method is setting us up as a preparation for the merger.
174
    It makes the initial contact and gathers information needed.
175

    
176
    @raise errors.RemoteError: for errors in communication/grabbing
177

    
178
    """
179
    (remote_path, _, _) = ssh.GetUserFiles("root")
180

    
181
    if self.cluster_name in self.clusters:
182
      raise errors.CommandError("Cannot merge cluster %s with itself" %
183
                                self.cluster_name)
184

    
185
    # Fetch remotes private key
186
    for cluster in self.clusters:
187
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
188
                            ask_key=False)
189
      if result.failed:
190
        raise errors.RemoteError("There was an error while grabbing ssh private"
191
                                 " key from %s. Fail reason: %s; output: %s" %
192
                                 (cluster, result.fail_reason, result.output))
193

    
194
      key_path = utils.PathJoin(self.work_dir, cluster)
195
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
196

    
197
      result = self._RunCmd(cluster, "gnt-node list -o name,offline"
198
                            " --no-header --separator=,", private_key=key_path)
199
      if result.failed:
200
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
201
                                 " Fail reason: %s; output: %s" %
202
                                 (cluster, result.fail_reason, result.output))
203
      nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
204
      nodes = [node_status[0] for node_status in nodes_statuses
205
               if node_status[1] == "N"]
206

    
207
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
208
                            private_key=key_path)
209
      if result.failed:
210
        raise errors.RemoteError("Unable to retrieve list of instances from"
211
                                 " %s. Fail reason: %s; output: %s" %
212
                                 (cluster, result.fail_reason, result.output))
213
      instances = result.stdout.splitlines()
214

    
215
      path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
216
                            constants.SS_MASTER_NODE)
217
      result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
218
      if result.failed:
219
        raise errors.RemoteError("Unable to retrieve the master node name from"
220
                                 " %s. Fail reason: %s; output: %s" %
221
                                 (cluster, result.fail_reason, result.output))
222
      master_node = result.stdout.strip()
223

    
224
      path = utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
225
                            constants.SS_MASTER_IP)
226
      result = self._RunCmd(cluster, "cat %s" % path, private_key=key_path)
227
      if result.failed:
228
        raise errors.RemoteError("Unable to retrieve the master IP from"
229
                                 " %s. Fail reason: %s; output: %s" %
230
                                 (cluster, result.fail_reason, result.output))
231
      master_ip = result.stdout.strip()
232

    
233
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances,
234
                                         master_node, master_ip))
235

    
236
  def _PrepareAuthorizedKeys(self):
237
    """Prepare the authorized_keys on every merging node.
238

    
239
    This method add our public key to remotes authorized_key for further
240
    communication.
241

    
242
    """
243
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
244
    pub_key = utils.ReadFile(pub_key_file)
245

    
246
    for data in self.merger_data:
247
      for node in data.nodes:
248
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
249
                                     (auth_keys, pub_key)),
250
                              private_key=data.key_path, max_attempts=3)
251

    
252
        if result.failed:
253
          raise errors.RemoteError("Unable to add our public key to %s in %s."
254
                                   " Fail reason: %s; output: %s" %
255
                                   (node, data.cluster, result.fail_reason,
256
                                    result.output))
257

    
258
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
259
              strict_host_check=False, private_key=None, batch=True,
260
              ask_key=False, max_attempts=1):
261
    """Wrapping SshRunner.Run with default parameters.
262

    
263
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
264

    
265
    """
266
    for _ in range(max_attempts):
267
      result = self.ssh_runner.Run(hostname=hostname, command=command,
268
                                 user=user, use_cluster_key=use_cluster_key,
269
                                 strict_host_check=strict_host_check,
270
                                 private_key=private_key, batch=batch,
271
                                 ask_key=ask_key)
272
      if not result.failed:
273
        break
274

    
275
    return result
276

    
277
  def _CheckRunningInstances(self):
278
    """Checks if on the clusters to be merged there are running instances
279

    
280
    @rtype: boolean
281
    @return: True if there are running instances, False otherwise
282

    
283
    """
284
    for cluster in self.clusters:
285
      result = self._RunCmd(cluster, "gnt-instance list -o status")
286
      if self.RUNNING_STATUSES.intersection(result.output.splitlines()):
287
        return True
288

    
289
    return False
290

    
291
  def _StopMergingInstances(self):
292
    """Stop instances on merging clusters.
293

    
294
    """
295
    for cluster in self.clusters:
296
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
297
                                     " --force-multiple")
298

    
299
      if result.failed:
300
        raise errors.RemoteError("Unable to stop instances on %s."
301
                                 " Fail reason: %s; output: %s" %
302
                                 (cluster, result.fail_reason, result.output))
303

    
304
  def _DisableWatcher(self):
305
    """Disable watch on all merging clusters, including ourself.
306

    
307
    """
308
    for cluster in ["localhost"] + self.clusters:
309
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
310
                                     self.pause_period)
311

    
312
      if result.failed:
313
        raise errors.RemoteError("Unable to pause watcher on %s."
314
                                 " Fail reason: %s; output: %s" %
315
                                 (cluster, result.fail_reason, result.output))
316

    
317
  def _RemoveMasterIps(self):
318
    """Removes the master IPs from the master nodes of each cluster.
319

    
320
    """
321
    for data in self.merger_data:
322
      master_ip_family = netutils.IPAddress.GetAddressFamily(data.master_ip)
323
      master_ip_len = netutils.IP4Address.iplen
324
      if master_ip_family == netutils.IP6Address.family:
325
        master_ip_len = netutils.IP6Address.iplen
326
      # Not using constants.IP_COMMAND_PATH because the command might run on a
327
      # machine in which the ip path is different, so it's better to rely on
328
      # $PATH.
329
      cmd = "ip address del %s/%s dev $(cat %s)" % (
330
             data.master_ip,
331
             master_ip_len,
332
             utils.PathJoin(constants.DATA_DIR, "ssconf_%s" %
333
                            constants.SS_MASTER_NETDEV))
334
      result = self._RunCmd(data.master_node, cmd, max_attempts=3)
335
      if result.failed:
336
        raise errors.RemoteError("Unable to remove master IP on %s."
337
                                 " Fail reason: %s; output: %s" %
338
                                 (data.master_node,
339
                                  result.fail_reason,
340
                                  result.output))
341

    
342
  def _StopDaemons(self):
343
    """Stop all daemons on merging nodes.
344

    
345
    """
346
    cmd = "%s stop-all" % constants.DAEMON_UTIL
347
    for data in self.merger_data:
348
      for node in data.nodes:
349
        result = self._RunCmd(node, cmd, max_attempts=3)
350

    
351
        if result.failed:
352
          raise errors.RemoteError("Unable to stop daemons on %s."
353
                                   " Fail reason: %s; output: %s." %
354
                                   (node, result.fail_reason, result.output))
355

    
356
  def _FetchRemoteConfig(self):
357
    """Fetches and stores remote cluster config from the master.
358

    
359
    This step is needed before we can merge the config.
360

    
361
    """
362
    for data in self.merger_data:
363
      result = self._RunCmd(data.cluster, "cat %s" %
364
                                          constants.CLUSTER_CONF_FILE)
365

    
366
      if result.failed:
367
        raise errors.RemoteError("Unable to retrieve remote config on %s."
368
                                 " Fail reason: %s; output %s" %
369
                                 (data.cluster, result.fail_reason,
370
                                  result.output))
371

    
372
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
373
                                        data.cluster)
374
      utils.WriteFile(data.config_path, data=result.stdout)
375

    
376
  # R0201: Method could be a function
377
  def _KillMasterDaemon(self): # pylint: disable=R0201
378
    """Kills the local master daemon.
379

    
380
    @raise errors.CommandError: If unable to kill
381

    
382
    """
383
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
384
    if result.failed:
385
      raise errors.CommandError("Unable to stop master daemons."
386
                                " Fail reason: %s; output: %s" %
387
                                (result.fail_reason, result.output))
388

    
389
  def _MergeConfig(self):
390
    """Merges all foreign config into our own config.
391

    
392
    """
393
    my_config = config.ConfigWriter(offline=True)
394
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
395

    
396
    for data in self.merger_data:
397
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
398
      self._MergeClusterConfigs(my_config, other_config)
399
      self._MergeNodeGroups(my_config, other_config)
400

    
401
      for node in other_config.GetNodeList():
402
        node_info = other_config.GetNodeInfo(node)
403
        # Offline the node, it will be reonlined later at node readd
404
        node_info.master_candidate = False
405
        node_info.drained = False
406
        node_info.offline = True
407
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
408
        fake_ec_id += 1
409

    
410
      for instance in other_config.GetInstanceList():
411
        instance_info = other_config.GetInstanceInfo(instance)
412

    
413
        # Update the DRBD port assignments
414
        # This is a little bit hackish
415
        for dsk in instance_info.disks:
416
          if dsk.dev_type in constants.LDS_DRBD:
417
            port = my_config.AllocatePort()
418

    
419
            logical_id = list(dsk.logical_id)
420
            logical_id[2] = port
421
            dsk.logical_id = tuple(logical_id)
422

    
423
            physical_id = list(dsk.physical_id)
424
            physical_id[1] = physical_id[3] = port
425
            dsk.physical_id = tuple(physical_id)
426

    
427
        my_config.AddInstance(instance_info,
428
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
429
        fake_ec_id += 1
430

    
431
  def _MergeClusterConfigs(self, my_config, other_config):
432
    """Checks that all relevant cluster parameters are compatible
433

    
434
    """
435
    my_cluster = my_config.GetClusterInfo()
436
    other_cluster = other_config.GetClusterInfo()
437
    err_count = 0
438

    
439
    #
440
    # Generic checks
441
    #
442
    check_params = [
443
      "beparams",
444
      "default_iallocator",
445
      "drbd_usermode_helper",
446
      "hidden_os",
447
      "maintain_node_health",
448
      "master_netdev",
449
      "ndparams",
450
      "nicparams",
451
      "primary_ip_family",
452
      "tags",
453
      "uid_pool",
454
      ]
455
    check_params_strict = [
456
      "volume_group_name",
457
    ]
458
    if constants.ENABLE_FILE_STORAGE:
459
      check_params_strict.append("file_storage_dir")
460
    if constants.ENABLE_SHARED_FILE_STORAGE:
461
      check_params_strict.append("shared_file_storage_dir")
462
    check_params.extend(check_params_strict)
463

    
464
    if self.params == _PARAMS_STRICT:
465
      params_strict = True
466
    else:
467
      params_strict = False
468

    
469
    for param_name in check_params:
470
      my_param = getattr(my_cluster, param_name)
471
      other_param = getattr(other_cluster, param_name)
472
      if my_param != other_param:
473
        logging.error("The value (%s) of the cluster parameter %s on %s"
474
                      " differs to this cluster's value (%s)",
475
                      other_param, param_name, other_cluster.cluster_name,
476
                      my_param)
477
        if params_strict or param_name in check_params_strict:
478
          err_count += 1
479

    
480
    #
481
    # Custom checks
482
    #
483

    
484
    # Check default hypervisor
485
    my_defhyp = my_cluster.enabled_hypervisors[0]
486
    other_defhyp = other_cluster.enabled_hypervisors[0]
487
    if my_defhyp != other_defhyp:
488
      logging.warning("The default hypervisor (%s) differs on %s, new"
489
                      " instances will be created with this cluster's"
490
                      " default hypervisor (%s)", other_defhyp,
491
                      other_cluster.cluster_name, my_defhyp)
492

    
493
    if (set(my_cluster.enabled_hypervisors) !=
494
        set(other_cluster.enabled_hypervisors)):
495
      logging.error("The set of enabled hypervisors (%s) on %s differs to"
496
                    " this cluster's set (%s)",
497
                    other_cluster.enabled_hypervisors,
498
                    other_cluster.cluster_name, my_cluster.enabled_hypervisors)
499
      err_count += 1
500

    
501
    # Check hypervisor params for hypervisors we care about
502
    for hyp in my_cluster.enabled_hypervisors:
503
      for param in my_cluster.hvparams[hyp]:
504
        my_value = my_cluster.hvparams[hyp][param]
505
        other_value = other_cluster.hvparams[hyp][param]
506
        if my_value != other_value:
507
          logging.error("The value (%s) of the %s parameter of the %s"
508
                        " hypervisor on %s differs to this cluster's parameter"
509
                        " (%s)",
510
                        other_value, param, hyp, other_cluster.cluster_name,
511
                        my_value)
512
          if params_strict:
513
            err_count += 1
514

    
515
    # Check os hypervisor params for hypervisors we care about
516
    for os_name in set(my_cluster.os_hvp.keys() + other_cluster.os_hvp.keys()):
517
      for hyp in my_cluster.enabled_hypervisors:
518
        my_os_hvp = self._GetOsHypervisor(my_cluster, os_name, hyp)
519
        other_os_hvp = self._GetOsHypervisor(other_cluster, os_name, hyp)
520
        if my_os_hvp != other_os_hvp:
521
          logging.error("The OS parameters (%s) for the %s OS for the %s"
522
                        " hypervisor on %s differs to this cluster's parameters"
523
                        " (%s)",
524
                        other_os_hvp, os_name, hyp, other_cluster.cluster_name,
525
                        my_os_hvp)
526
          if params_strict:
527
            err_count += 1
528

    
529
    #
530
    # Warnings
531
    #
532
    if my_cluster.modify_etc_hosts != other_cluster.modify_etc_hosts:
533
      logging.warning("The modify_etc_hosts value (%s) differs on %s,"
534
                      " this cluster's value (%s) will take precedence",
535
                      other_cluster.modify_etc_hosts,
536
                      other_cluster.cluster_name,
537
                      my_cluster.modify_etc_hosts)
538

    
539
    if my_cluster.modify_ssh_setup != other_cluster.modify_ssh_setup:
540
      logging.warning("The modify_ssh_setup value (%s) differs on %s,"
541
                      " this cluster's value (%s) will take precedence",
542
                      other_cluster.modify_ssh_setup,
543
                      other_cluster.cluster_name,
544
                      my_cluster.modify_ssh_setup)
545

    
546
    #
547
    # Actual merging
548
    #
549
    my_cluster.reserved_lvs = list(set(my_cluster.reserved_lvs +
550
                                       other_cluster.reserved_lvs))
551

    
552
    if my_cluster.prealloc_wipe_disks != other_cluster.prealloc_wipe_disks:
553
      logging.warning("The prealloc_wipe_disks value (%s) on %s differs to this"
554
                      " cluster's value (%s). The least permissive value (%s)"
555
                      " will be used", other_cluster.prealloc_wipe_disks,
556
                      other_cluster.cluster_name,
557
                      my_cluster.prealloc_wipe_disks, True)
558
      my_cluster.prealloc_wipe_disks = True
559

    
560
    for os_, osparams in other_cluster.osparams.items():
561
      if os_ not in my_cluster.osparams:
562
        my_cluster.osparams[os_] = osparams
563
      elif my_cluster.osparams[os_] != osparams:
564
        logging.error("The OS parameters (%s) for the %s OS on %s differs to"
565
                      " this cluster's parameters (%s)",
566
                      osparams, os_, other_cluster.cluster_name,
567
                      my_cluster.osparams[os_])
568
        if params_strict:
569
          err_count += 1
570

    
571
    if err_count:
572
      raise errors.ConfigurationError("Cluster config for %s has incompatible"
573
                                      " values, please fix and re-run" %
574
                                      other_cluster.cluster_name)
575

    
576
  # R0201: Method could be a function
577
  def _GetOsHypervisor(self, cluster, os_name, hyp): # pylint: disable=R0201
578
    if os_name in cluster.os_hvp:
579
      return cluster.os_hvp[os_name].get(hyp, None)
580
    else:
581
      return None
582

    
583
  # R0201: Method could be a function
584
  def _MergeNodeGroups(self, my_config, other_config):
585
    """Adds foreign node groups
586

    
587
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
588
    """
589
    # pylint: disable=R0201
590
    logging.info("Node group conflict strategy: %s", self.groups)
591

    
592
    my_grps = my_config.GetAllNodeGroupsInfo().values()
593
    other_grps = other_config.GetAllNodeGroupsInfo().values()
594

    
595
    # Check for node group naming conflicts:
596
    conflicts = []
597
    for other_grp in other_grps:
598
      for my_grp in my_grps:
599
        if other_grp.name == my_grp.name:
600
          conflicts.append(other_grp)
601

    
602
    if conflicts:
603
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
604
      logging.info("Node groups in both local and remote cluster: %s",
605
                   conflict_names)
606

    
607
      # User hasn't specified how to handle conflicts
608
      if not self.groups:
609
        raise errors.CommandError("The following node group(s) are in both"
610
                                  " clusters, and no merge strategy has been"
611
                                  " supplied (see the --groups option): %s" %
612
                                  conflict_names)
613

    
614
      # User wants to rename conflicts
615
      elif self.groups == _GROUPS_RENAME:
616
        for grp in conflicts:
617
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
618
          logging.info("Renaming remote node group from %s to %s"
619
                       " to resolve conflict", grp.name, new_name)
620
          grp.name = new_name
621

    
622
      # User wants to merge conflicting groups
623
      elif self.groups == _GROUPS_MERGE:
624
        for other_grp in conflicts:
625
          logging.info("Merging local and remote '%s' groups", other_grp.name)
626
          for node_name in other_grp.members[:]:
627
            node = other_config.GetNodeInfo(node_name)
628
            # Access to a protected member of a client class
629
            # pylint: disable=W0212
630
            other_config._UnlockedRemoveNodeFromGroup(node)
631

    
632
            # Access to a protected member of a client class
633
            # pylint: disable=W0212
634
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
635

    
636
            # Access to a protected member of a client class
637
            # pylint: disable=W0212
638
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
639
            node.group = my_grp_uuid
640
          # Remove from list of groups to add
641
          other_grps.remove(other_grp)
642

    
643
    for grp in other_grps:
644
      #TODO: handle node group conflicts
645
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
646

    
647
  # R0201: Method could be a function
648
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable=R0201
649
    """Starts the local master daemon.
650

    
651
    @param no_vote: Should the masterd started without voting? default: False
652
    @raise errors.CommandError: If unable to start daemon.
653

    
654
    """
655
    env = {}
656
    if no_vote:
657
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
658

    
659
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
660
    if result.failed:
661
      raise errors.CommandError("Couldn't start ganeti master."
662
                                " Fail reason: %s; output: %s" %
663
                                (result.fail_reason, result.output))
664

    
665
  def _ReaddMergedNodesAndRedist(self):
666
    """Readds all merging nodes and make sure their config is up-to-date.
667

    
668
    @raise errors.CommandError: If anything fails.
669

    
670
    """
671
    for data in self.merger_data:
672
      for node in data.nodes:
673
        result = utils.RunCmd(["gnt-node", "add", "--readd",
674
                               "--no-ssh-key-check", "--force-join", node])
675
        if result.failed:
676
          logging.error("%s failed to be readded. Reason: %s, output: %s",
677
                         node, result.fail_reason, result.output)
678

    
679
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
680
    if result.failed:
681
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
682
                                " output: %s" % (result.fail_reason,
683
                                                result.output))
684

    
685
  # R0201: Method could be a function
686
  def _StartupAllInstances(self): # pylint: disable=R0201
687
    """Starts up all instances (locally).
688

    
689
    @raise errors.CommandError: If unable to start clusters
690

    
691
    """
692
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
693
                           "--force-multiple"])
694
    if result.failed:
695
      raise errors.CommandError("Unable to start all instances."
696
                                " Fail reason: %s; output: %s" %
697
                                (result.fail_reason, result.output))
698

    
699
  # R0201: Method could be a function
700
  # TODO: make this overridable, for some verify errors
701
  def _VerifyCluster(self): # pylint: disable=R0201
702
    """Runs gnt-cluster verify to verify the health.
703

    
704
    @raise errors.ProgrammError: If cluster fails on verification
705

    
706
    """
707
    result = utils.RunCmd(["gnt-cluster", "verify"])
708
    if result.failed:
709
      raise errors.CommandError("Verification of cluster failed."
710
                                " Fail reason: %s; output: %s" %
711
                                (result.fail_reason, result.output))
712

    
713
  def Merge(self):
714
    """Does the actual merge.
715

    
716
    It runs all the steps in the right order and updates the user about steps
717
    taken. Also it keeps track of rollback_steps to undo everything.
718

    
719
    """
720
    rbsteps = []
721
    try:
722
      logging.info("Pre cluster verification")
723
      self._VerifyCluster()
724

    
725
      logging.info("Prepare authorized_keys")
726
      rbsteps.append("Remove our key from authorized_keys on nodes:"
727
                     " %(nodes)s")
728
      self._PrepareAuthorizedKeys()
729

    
730
      rbsteps.append("Start all instances again on the merging"
731
                     " clusters: %(clusters)s")
732
      if self.stop_instances:
733
        logging.info("Stopping merging instances (takes a while)")
734
        self._StopMergingInstances()
735
      logging.info("Checking that no instances are running on the mergees")
736
      instances_running = self._CheckRunningInstances()
737
      if instances_running:
738
        raise errors.CommandError("Some instances are still running on the"
739
                                  " mergees")
740
      logging.info("Disable watcher")
741
      self._DisableWatcher()
742
      logging.info("Stop daemons on merging nodes")
743
      self._StopDaemons()
744
      logging.info("Merging config")
745
      self._FetchRemoteConfig()
746
      logging.info("Removing master IPs on mergee master nodes")
747
      self._RemoveMasterIps()
748

    
749
      logging.info("Stopping master daemon")
750
      self._KillMasterDaemon()
751

    
752
      rbsteps.append("Restore %s from another master candidate"
753
                     " and restart master daemon" %
754
                     constants.CLUSTER_CONF_FILE)
755
      self._MergeConfig()
756
      self._StartMasterDaemon(no_vote=True)
757

    
758
      # Point of no return, delete rbsteps
759
      del rbsteps[:]
760

    
761
      logging.warning("We are at the point of no return. Merge can not easily"
762
                      " be undone after this point.")
763
      logging.info("Readd nodes")
764
      self._ReaddMergedNodesAndRedist()
765

    
766
      logging.info("Merge done, restart master daemon normally")
767
      self._KillMasterDaemon()
768
      self._StartMasterDaemon()
769

    
770
      if self.restart == _RESTART_ALL:
771
        logging.info("Starting instances again")
772
        self._StartupAllInstances()
773
      else:
774
        logging.info("Not starting instances again")
775
      logging.info("Post cluster verification")
776
      self._VerifyCluster()
777
    except errors.GenericError, e:
778
      logging.exception(e)
779

    
780
      if rbsteps:
781
        nodes = Flatten([data.nodes for data in self.merger_data])
782
        info = {
783
          "clusters": self.clusters,
784
          "nodes": nodes,
785
          }
786
        logging.critical("In order to rollback do the following:")
787
        for step in rbsteps:
788
          logging.critical("  * %s", step % info)
789
      else:
790
        logging.critical("Nothing to rollback.")
791

    
792
      # TODO: Keep track of steps done for a flawless resume?
793

    
794
  def Cleanup(self):
795
    """Clean up our environment.
796

    
797
    This cleans up remote private keys and configs and after that
798
    deletes the temporary directory.
799

    
800
    """
801
    shutil.rmtree(self.work_dir)
802

    
803

    
804
def SetupLogging(options):
805
  """Setting up logging infrastructure.
806

    
807
  @param options: Parsed command line options
808

    
809
  """
810
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
811

    
812
  stderr_handler = logging.StreamHandler()
813
  stderr_handler.setFormatter(formatter)
814
  if options.debug:
815
    stderr_handler.setLevel(logging.NOTSET)
816
  elif options.verbose:
817
    stderr_handler.setLevel(logging.INFO)
818
  else:
819
    stderr_handler.setLevel(logging.WARNING)
820

    
821
  root_logger = logging.getLogger("")
822
  root_logger.setLevel(logging.NOTSET)
823
  root_logger.addHandler(stderr_handler)
824

    
825

    
826
def main():
827
  """Main routine.
828

    
829
  """
830
  program = os.path.basename(sys.argv[0])
831

    
832
  parser = optparse.OptionParser(usage="%%prog [options...] <cluster...>",
833
                                 prog=program)
834
  parser.add_option(cli.DEBUG_OPT)
835
  parser.add_option(cli.VERBOSE_OPT)
836
  parser.add_option(PAUSE_PERIOD_OPT)
837
  parser.add_option(GROUPS_OPT)
838
  parser.add_option(RESTART_OPT)
839
  parser.add_option(PARAMS_OPT)
840
  parser.add_option(SKIP_STOP_INSTANCES_OPT)
841

    
842
  (options, args) = parser.parse_args()
843

    
844
  SetupLogging(options)
845

    
846
  if not args:
847
    parser.error("No clusters specified")
848

    
849
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
850
                          options.groups, options.restart, options.params,
851
                          options.stop_instances)
852
  try:
853
    try:
854
      cluster_merger.Setup()
855
      cluster_merger.Merge()
856
    except errors.GenericError, e:
857
      logging.exception(e)
858
      return constants.EXIT_FAILURE
859
  finally:
860
    cluster_merger.Cleanup()
861

    
862
  return constants.EXIT_SUCCESS
863

    
864

    
865
if __name__ == "__main__":
866
  sys.exit(main())