Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ 9b945588

History | View | Annotate | Download (20.2 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
_GROUPS_MERGE = "merge"
46
_GROUPS_RENAME = "rename"
47
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48

    
49
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50
                                  action="store", type="int",
51
                                  dest="pause_period",
52
                                  help=("Amount of time in seconds watcher"
53
                                        " should be suspended from running"))
54
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
56
                            dest="groups",
57
                            help=("How to handle groups that have the"
58
                                  " same name (One of: %s/%s)" %
59
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
60

    
61

    
62
def Flatten(unflattened_list):
63
  """Flattens a list.
64

    
65
  @param unflattened_list: A list of unflattened list objects.
66
  @return: A flattened list
67

    
68
  """
69
  flattened_list = []
70

    
71
  for item in unflattened_list:
72
    if isinstance(item, list):
73
      flattened_list.extend(Flatten(item))
74
    else:
75
      flattened_list.append(item)
76
  return flattened_list
77

    
78

    
79
class MergerData(object):
80
  """Container class to hold data used for merger.
81

    
82
  """
83
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
84
    """Initialize the container.
85

    
86
    @param cluster: The name of the cluster
87
    @param key_path: Path to the ssh private key used for authentication
88
    @param nodes: List of nodes in the merging cluster
89
    @param instances: List of instances running on merging cluster
90
    @param config_path: Path to the merging cluster config
91

    
92
    """
93
    self.cluster = cluster
94
    self.key_path = key_path
95
    self.nodes = nodes
96
    self.instances = instances
97
    self.config_path = config_path
98

    
99

    
100
class Merger(object):
101
  """Handling the merge.
102

    
103
  """
104
  def __init__(self, clusters, pause_period, groups):
105
    """Initialize object with sane defaults and infos required.
106

    
107
    @param clusters: The list of clusters to merge in
108
    @param pause_period: The time watcher shall be disabled for
109
    @param groups: How to handle group conflicts
110

    
111
    """
112
    self.merger_data = []
113
    self.clusters = clusters
114
    self.pause_period = pause_period
115
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
116
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
117
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
118
    self.groups = groups
119

    
120
  def Setup(self):
121
    """Sets up our end so we can do the merger.
122

    
123
    This method is setting us up as a preparation for the merger.
124
    It makes the initial contact and gathers information needed.
125

    
126
    @raise errors.RemoteError: for errors in communication/grabbing
127

    
128
    """
129
    (remote_path, _, _) = ssh.GetUserFiles("root")
130

    
131
    if self.cluster_name in self.clusters:
132
      raise errors.CommandError("Cannot merge cluster %s with itself" %
133
                                self.cluster_name)
134

    
135
    # Fetch remotes private key
136
    for cluster in self.clusters:
137
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
138
                            ask_key=False)
139
      if result.failed:
140
        raise errors.RemoteError("There was an error while grabbing ssh private"
141
                                 " key from %s. Fail reason: %s; output: %s" %
142
                                 (cluster, result.fail_reason, result.output))
143

    
144
      key_path = utils.PathJoin(self.work_dir, cluster)
145
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
146

    
147
      result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
148
                            private_key=key_path)
149
      if result.failed:
150
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
151
                                 " Fail reason: %s; output: %s" %
152
                                 (cluster, result.fail_reason, result.output))
153
      nodes = result.stdout.splitlines()
154

    
155
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
156
                            private_key=key_path)
157
      if result.failed:
158
        raise errors.RemoteError("Unable to retrieve list of instances from"
159
                                 " %s. Fail reason: %s; output: %s" %
160
                                 (cluster, result.fail_reason, result.output))
161
      instances = result.stdout.splitlines()
162

    
163
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
164

    
165
  def _PrepareAuthorizedKeys(self):
166
    """Prepare the authorized_keys on every merging node.
167

    
168
    This method add our public key to remotes authorized_key for further
169
    communication.
170

    
171
    """
172
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
173
    pub_key = utils.ReadFile(pub_key_file)
174

    
175
    for data in self.merger_data:
176
      for node in data.nodes:
177
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
178
                                     (auth_keys, pub_key)),
179
                              private_key=data.key_path)
180

    
181
        if result.failed:
182
          raise errors.RemoteError("Unable to add our public key to %s in %s."
183
                                   " Fail reason: %s; output: %s" %
184
                                   (node, data.cluster, result.fail_reason,
185
                                    result.output))
186

    
187
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
188
              strict_host_check=False, private_key=None, batch=True,
189
              ask_key=False):
190
    """Wrapping SshRunner.Run with default parameters.
191

    
192
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
193

    
194
    """
195
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
196
                               use_cluster_key=use_cluster_key,
197
                               strict_host_check=strict_host_check,
198
                               private_key=private_key, batch=batch,
199
                               ask_key=ask_key)
200

    
201
  def _StopMergingInstances(self):
202
    """Stop instances on merging clusters.
203

    
204
    """
205
    for cluster in self.clusters:
206
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
207
                                     " --force-multiple")
208

    
209
      if result.failed:
210
        raise errors.RemoteError("Unable to stop instances on %s."
211
                                 " Fail reason: %s; output: %s" %
212
                                 (cluster, result.fail_reason, result.output))
213

    
214
  def _DisableWatcher(self):
215
    """Disable watch on all merging clusters, including ourself.
216

    
217
    """
218
    for cluster in ["localhost"] + self.clusters:
219
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
220
                                     self.pause_period)
221

    
222
      if result.failed:
223
        raise errors.RemoteError("Unable to pause watcher on %s."
224
                                 " Fail reason: %s; output: %s" %
225
                                 (cluster, result.fail_reason, result.output))
226

    
227
  def _StopDaemons(self):
228
    """Stop all daemons on merging nodes.
229

    
230
    """
231
    cmd = "%s stop-all" % constants.DAEMON_UTIL
232
    for data in self.merger_data:
233
      for node in data.nodes:
234
        result = self._RunCmd(node, cmd)
235

    
236
        if result.failed:
237
          raise errors.RemoteError("Unable to stop daemons on %s."
238
                                   " Fail reason: %s; output: %s." %
239
                                   (node, result.fail_reason, result.output))
240

    
241
  def _FetchRemoteConfig(self):
242
    """Fetches and stores remote cluster config from the master.
243

    
244
    This step is needed before we can merge the config.
245

    
246
    """
247
    for data in self.merger_data:
248
      result = self._RunCmd(data.cluster, "cat %s" %
249
                                          constants.CLUSTER_CONF_FILE)
250

    
251
      if result.failed:
252
        raise errors.RemoteError("Unable to retrieve remote config on %s."
253
                                 " Fail reason: %s; output %s" %
254
                                 (data.cluster, result.fail_reason,
255
                                  result.output))
256

    
257
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
258
                                        data.cluster)
259
      utils.WriteFile(data.config_path, data=result.stdout)
260

    
261
  # R0201: Method could be a function
262
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
263
    """Kills the local master daemon.
264

    
265
    @raise errors.CommandError: If unable to kill
266

    
267
    """
268
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
269
    if result.failed:
270
      raise errors.CommandError("Unable to stop master daemons."
271
                                " Fail reason: %s; output: %s" %
272
                                (result.fail_reason, result.output))
273

    
274
  def _MergeConfig(self):
275
    """Merges all foreign config into our own config.
276

    
277
    """
278
    my_config = config.ConfigWriter(offline=True)
279
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
280

    
281
    for data in self.merger_data:
282
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
283
      self._MergeNodeGroups(my_config, other_config)
284

    
285
      for node in other_config.GetNodeList():
286
        node_info = other_config.GetNodeInfo(node)
287
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
288
        fake_ec_id += 1
289

    
290
      for instance in other_config.GetInstanceList():
291
        instance_info = other_config.GetInstanceInfo(instance)
292

    
293
        # Update the DRBD port assignments
294
        # This is a little bit hackish
295
        for dsk in instance_info.disks:
296
          if dsk.dev_type in constants.LDS_DRBD:
297
            port = my_config.AllocatePort()
298

    
299
            logical_id = list(dsk.logical_id)
300
            logical_id[2] = port
301
            dsk.logical_id = tuple(logical_id)
302

    
303
            physical_id = list(dsk.physical_id)
304
            physical_id[1] = physical_id[3] = port
305
            dsk.physical_id = tuple(physical_id)
306

    
307
        my_config.AddInstance(instance_info,
308
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
309
        fake_ec_id += 1
310

    
311
  # R0201: Method could be a function
312
  def _MergeNodeGroups(self, my_config, other_config):
313
    """Adds foreign node groups
314

    
315
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
316
    """
317
    # pylint: disable-msg=R0201
318
    logging.info("Node group conflict strategy: %s", self.groups)
319

    
320
    my_grps = my_config.GetAllNodeGroupsInfo().values()
321
    other_grps = other_config.GetAllNodeGroupsInfo().values()
322

    
323
    # Check for node group naming conflicts:
324
    conflicts = []
325
    for other_grp in other_grps:
326
      for my_grp in my_grps:
327
        if other_grp.name == my_grp.name:
328
          conflicts.append(other_grp)
329

    
330
    if conflicts:
331
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
332
      logging.info("Node groups in both local and remote cluster: %s",
333
                   conflict_names)
334

    
335
      # User hasn't specified how to handle conflicts
336
      if not self.groups:
337
        raise errors.CommandError("The following node group(s) are in both"
338
                                  " clusters, and no merge strategy has been"
339
                                  " supplied (see the --groups option): %s" %
340
                                  conflict_names)
341

    
342
      # User wants to rename conflicts
343
      elif self.groups == _GROUPS_RENAME:
344
        for grp in conflicts:
345
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
346
          logging.info("Renaming remote node group from %s to %s"
347
                       " to resolve conflict", grp.name, new_name)
348
          grp.name = new_name
349

    
350
      # User wants to merge conflicting groups
351
      elif self.groups == 'merge':
352
        for other_grp in conflicts:
353
          logging.info("Merging local and remote '%s' groups", other_grp.name)
354
          for node_name in other_grp.members[:]:
355
            node = other_config.GetNodeInfo(node_name)
356
            # Access to a protected member of a client class
357
            # pylint: disable-msg=W0212
358
            other_config._UnlockedRemoveNodeFromGroup(node)
359

    
360
            # Access to a protected member of a client class
361
            # pylint: disable-msg=W0212
362
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
363

    
364
            # Access to a protected member of a client class
365
            # pylint: disable-msg=W0212
366
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
367
            node.group = my_grp_uuid
368
          # Remove from list of groups to add
369
          other_grps.remove(other_grp)
370

    
371
    for grp in other_grps:
372
      #TODO: handle node group conflicts
373
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
374

    
375
  # R0201: Method could be a function
376
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
377
    """Starts the local master daemon.
378

    
379
    @param no_vote: Should the masterd started without voting? default: False
380
    @raise errors.CommandError: If unable to start daemon.
381

    
382
    """
383
    env = {}
384
    if no_vote:
385
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
386

    
387
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
388
    if result.failed:
389
      raise errors.CommandError("Couldn't start ganeti master."
390
                                " Fail reason: %s; output: %s" %
391
                                (result.fail_reason, result.output))
392

    
393
  def _ReaddMergedNodesAndRedist(self):
394
    """Readds all merging nodes and make sure their config is up-to-date.
395

    
396
    @raise errors.CommandError: If anything fails.
397

    
398
    """
399
    for data in self.merger_data:
400
      for node in data.nodes:
401
        result = utils.RunCmd(["gnt-node", "add", "--readd",
402
                               "--no-ssh-key-check", "--force-join", node])
403
        if result.failed:
404
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
405
                                    " output: %s" % (node, result.fail_reason,
406
                                                     result.output))
407

    
408
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
409
    if result.failed:
410
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
411
                                " output: %s" % (result.fail_reason,
412
                                                result.output))
413

    
414
  # R0201: Method could be a function
415
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
416
    """Starts up all instances (locally).
417

    
418
    @raise errors.CommandError: If unable to start clusters
419

    
420
    """
421
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
422
                           "--force-multiple"])
423
    if result.failed:
424
      raise errors.CommandError("Unable to start all instances."
425
                                " Fail reason: %s; output: %s" %
426
                                (result.fail_reason, result.output))
427

    
428
  # R0201: Method could be a function
429
  def _VerifyCluster(self): # pylint: disable-msg=R0201
430
    """Runs gnt-cluster verify to verify the health.
431

    
432
    @raise errors.ProgrammError: If cluster fails on verification
433

    
434
    """
435
    result = utils.RunCmd(["gnt-cluster", "verify"])
436
    if result.failed:
437
      raise errors.CommandError("Verification of cluster failed."
438
                                " Fail reason: %s; output: %s" %
439
                                (result.fail_reason, result.output))
440

    
441
  def Merge(self):
442
    """Does the actual merge.
443

    
444
    It runs all the steps in the right order and updates the user about steps
445
    taken. Also it keeps track of rollback_steps to undo everything.
446

    
447
    """
448
    rbsteps = []
449
    try:
450
      logging.info("Pre cluster verification")
451
      self._VerifyCluster()
452

    
453
      logging.info("Prepare authorized_keys")
454
      rbsteps.append("Remove our key from authorized_keys on nodes:"
455
                     " %(nodes)s")
456
      self._PrepareAuthorizedKeys()
457

    
458
      rbsteps.append("Start all instances again on the merging"
459
                     " clusters: %(clusters)s")
460
      logging.info("Stopping merging instances (takes a while)")
461
      self._StopMergingInstances()
462

    
463
      logging.info("Disable watcher")
464
      self._DisableWatcher()
465
      logging.info("Stop daemons on merging nodes")
466
      self._StopDaemons()
467
      logging.info("Merging config")
468
      self._FetchRemoteConfig()
469

    
470
      logging.info("Stopping master daemon")
471
      self._KillMasterDaemon()
472

    
473
      rbsteps.append("Restore %s from another master candidate"
474
                     " and restart master daemon" %
475
                     constants.CLUSTER_CONF_FILE)
476
      self._MergeConfig()
477
      self._StartMasterDaemon(no_vote=True)
478

    
479
      # Point of no return, delete rbsteps
480
      del rbsteps[:]
481

    
482
      logging.warning("We are at the point of no return. Merge can not easily"
483
                      " be undone after this point.")
484
      logging.info("Readd nodes")
485
      self._ReaddMergedNodesAndRedist()
486

    
487
      logging.info("Merge done, restart master daemon normally")
488
      self._KillMasterDaemon()
489
      self._StartMasterDaemon()
490

    
491
      logging.info("Starting instances again")
492
      self._StartupAllInstances()
493
      logging.info("Post cluster verification")
494
      self._VerifyCluster()
495
    except errors.GenericError, e:
496
      logging.exception(e)
497

    
498
      if rbsteps:
499
        nodes = Flatten([data.nodes for data in self.merger_data])
500
        info = {
501
          "clusters": self.clusters,
502
          "nodes": nodes,
503
          }
504
        logging.critical("In order to rollback do the following:")
505
        for step in rbsteps:
506
          logging.critical("  * %s", step % info)
507
      else:
508
        logging.critical("Nothing to rollback.")
509

    
510
      # TODO: Keep track of steps done for a flawless resume?
511

    
512
  def Cleanup(self):
513
    """Clean up our environment.
514

    
515
    This cleans up remote private keys and configs and after that
516
    deletes the temporary directory.
517

    
518
    """
519
    shutil.rmtree(self.work_dir)
520

    
521

    
522
def SetupLogging(options):
523
  """Setting up logging infrastructure.
524

    
525
  @param options: Parsed command line options
526

    
527
  """
528
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
529

    
530
  stderr_handler = logging.StreamHandler()
531
  stderr_handler.setFormatter(formatter)
532
  if options.debug:
533
    stderr_handler.setLevel(logging.NOTSET)
534
  elif options.verbose:
535
    stderr_handler.setLevel(logging.INFO)
536
  else:
537
    stderr_handler.setLevel(logging.ERROR)
538

    
539
  root_logger = logging.getLogger("")
540
  root_logger.setLevel(logging.NOTSET)
541
  root_logger.addHandler(stderr_handler)
542

    
543

    
544
def main():
545
  """Main routine.
546

    
547
  """
548
  program = os.path.basename(sys.argv[0])
549

    
550
  parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
551
                                        " [--watcher-pause-period SECONDS]"
552
                                        " [--groups [%s|%s]]"
553
                                        " <cluster> [<cluster...>]" %
554
                                        (_GROUPS_MERGE, _GROUPS_RENAME)),
555
                                        prog=program)
556
  parser.add_option(cli.DEBUG_OPT)
557
  parser.add_option(cli.VERBOSE_OPT)
558
  parser.add_option(PAUSE_PERIOD_OPT)
559
  parser.add_option(GROUPS_OPT)
560

    
561
  (options, args) = parser.parse_args()
562

    
563
  SetupLogging(options)
564

    
565
  if not args:
566
    parser.error("No clusters specified")
567

    
568
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
569
                          options.groups)
570
  try:
571
    try:
572
      cluster_merger.Setup()
573
      cluster_merger.Merge()
574
    except errors.GenericError, e:
575
      logging.exception(e)
576
      return constants.EXIT_FAILURE
577
  finally:
578
    cluster_merger.Cleanup()
579

    
580
  return constants.EXIT_SUCCESS
581

    
582

    
583
if __name__ == "__main__":
584
  sys.exit(main())