Statistics
| Branch: | Tag: | Revision:

root / tools / cluster-merge @ 3a969900

History | View | Annotate | Download (19.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Tool to merge two or more clusters together.
22

    
23
The clusters have to run the same version of Ganeti!
24

    
25
"""
26

    
27
# pylint: disable-msg=C0103
28
# C0103: Invalid name cluster-merge
29

    
30
import logging
31
import os
32
import optparse
33
import shutil
34
import sys
35
import tempfile
36

    
37
from ganeti import cli
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import ssh
42
from ganeti import utils
43

    
44

    
45
_GROUPS_MERGE = "merge"
46
_GROUPS_RENAME = "rename"
47
_CLUSTERMERGE_ECID = "clustermerge-ecid"
48

    
49
PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50
                                  action="store", type="int",
51
                                  dest="pause_period",
52
                                  help=("Amount of time in seconds watcher"
53
                                        " should be suspended from running"))
54
GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55
                            choices=(_GROUPS_MERGE, _GROUPS_RENAME), dest="groups",
56
                            help=("How to handle groups that have the"
57
                                  " same name (One of: %s/%s)" %
58
                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
59

    
60

    
61
def Flatten(unflattened_list):
62
  """Flattens a list.
63

    
64
  @param unflattened_list: A list of unflattened list objects.
65
  @return: A flattened list
66

    
67
  """
68
  flattened_list = []
69

    
70
  for item in unflattened_list:
71
    if isinstance(item, list):
72
      flattened_list.extend(Flatten(item))
73
    else:
74
      flattened_list.append(item)
75
  return flattened_list
76

    
77

    
78
class MergerData(object):
79
  """Container class to hold data used for merger.
80

    
81
  """
82
  def __init__(self, cluster, key_path, nodes, instances, config_path=None):
83
    """Initialize the container.
84

    
85
    @param cluster: The name of the cluster
86
    @param key_path: Path to the ssh private key used for authentication
87
    @param nodes: List of nodes in the merging cluster
88
    @param instances: List of instances running on merging cluster
89
    @param config_path: Path to the merging cluster config
90

    
91
    """
92
    self.cluster = cluster
93
    self.key_path = key_path
94
    self.nodes = nodes
95
    self.instances = instances
96
    self.config_path = config_path
97

    
98

    
99
class Merger(object):
100
  """Handling the merge.
101

    
102
  """
103
  def __init__(self, clusters, pause_period, groups):
104
    """Initialize object with sane defaults and infos required.
105

    
106
    @param clusters: The list of clusters to merge in
107
    @param pause_period: The time watcher shall be disabled for
108
    @param groups: How to handle group conflicts
109

    
110
    """
111
    self.merger_data = []
112
    self.clusters = clusters
113
    self.pause_period = pause_period
114
    self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
115
    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
116
    self.ssh_runner = ssh.SshRunner(self.cluster_name)
117
    self.groups = groups
118

    
119
  def Setup(self):
120
    """Sets up our end so we can do the merger.
121

    
122
    This method is setting us up as a preparation for the merger.
123
    It makes the initial contact and gathers information needed.
124

    
125
    @raise errors.RemoteError: for errors in communication/grabbing
126

    
127
    """
128
    (remote_path, _, _) = ssh.GetUserFiles("root")
129

    
130
    if self.cluster_name in self.clusters:
131
      raise errors.CommandError("Cannot merge cluster %s with itself" %
132
                                self.cluster_name)
133

    
134
    # Fetch remotes private key
135
    for cluster in self.clusters:
136
      result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
137
                            ask_key=False)
138
      if result.failed:
139
        raise errors.RemoteError("There was an error while grabbing ssh private"
140
                                 " key from %s. Fail reason: %s; output: %s" %
141
                                 (cluster, result.fail_reason, result.output))
142

    
143
      key_path = utils.PathJoin(self.work_dir, cluster)
144
      utils.WriteFile(key_path, mode=0600, data=result.stdout)
145

    
146
      result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
147
                            private_key=key_path)
148
      if result.failed:
149
        raise errors.RemoteError("Unable to retrieve list of nodes from %s."
150
                                 " Fail reason: %s; output: %s" %
151
                                 (cluster, result.fail_reason, result.output))
152
      nodes = result.stdout.splitlines()
153

    
154
      result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
155
                            private_key=key_path)
156
      if result.failed:
157
        raise errors.RemoteError("Unable to retrieve list of instances from"
158
                                 " %s. Fail reason: %s; output: %s" %
159
                                 (cluster, result.fail_reason, result.output))
160
      instances = result.stdout.splitlines()
161

    
162
      self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
163

    
164
  def _PrepareAuthorizedKeys(self):
165
    """Prepare the authorized_keys on every merging node.
166

    
167
    This method add our public key to remotes authorized_key for further
168
    communication.
169

    
170
    """
171
    (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
172
    pub_key = utils.ReadFile(pub_key_file)
173

    
174
    for data in self.merger_data:
175
      for node in data.nodes:
176
        result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
177
                                     (auth_keys, pub_key)),
178
                              private_key=data.key_path)
179

    
180
        if result.failed:
181
          raise errors.RemoteError("Unable to add our public key to %s in %s."
182
                                   " Fail reason: %s; output: %s" %
183
                                   (node, data.cluster, result.fail_reason,
184
                                    result.output))
185

    
186
  def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
187
              strict_host_check=False, private_key=None, batch=True,
188
              ask_key=False):
189
    """Wrapping SshRunner.Run with default parameters.
190

    
191
    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
192

    
193
    """
194
    return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
195
                               use_cluster_key=use_cluster_key,
196
                               strict_host_check=strict_host_check,
197
                               private_key=private_key, batch=batch,
198
                               ask_key=ask_key)
199

    
200
  def _StopMergingInstances(self):
201
    """Stop instances on merging clusters.
202

    
203
    """
204
    for cluster in self.clusters:
205
      result = self._RunCmd(cluster, "gnt-instance shutdown --all"
206
                                     " --force-multiple")
207

    
208
      if result.failed:
209
        raise errors.RemoteError("Unable to stop instances on %s."
210
                                 " Fail reason: %s; output: %s" %
211
                                 (cluster, result.fail_reason, result.output))
212

    
213
  def _DisableWatcher(self):
214
    """Disable watch on all merging clusters, including ourself.
215

    
216
    """
217
    for cluster in ["localhost"] + self.clusters:
218
      result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
219
                                     self.pause_period)
220

    
221
      if result.failed:
222
        raise errors.RemoteError("Unable to pause watcher on %s."
223
                                 " Fail reason: %s; output: %s" %
224
                                 (cluster, result.fail_reason, result.output))
225

    
226
  def _StopDaemons(self):
227
    """Stop all daemons on merging nodes.
228

    
229
    """
230
    cmd = "%s stop-all" % constants.DAEMON_UTIL
231
    for data in self.merger_data:
232
      for node in data.nodes:
233
        result = self._RunCmd(node, cmd)
234

    
235
        if result.failed:
236
          raise errors.RemoteError("Unable to stop daemons on %s."
237
                                   " Fail reason: %s; output: %s." %
238
                                   (node, result.fail_reason, result.output))
239

    
240
  def _FetchRemoteConfig(self):
241
    """Fetches and stores remote cluster config from the master.
242

    
243
    This step is needed before we can merge the config.
244

    
245
    """
246
    for data in self.merger_data:
247
      result = self._RunCmd(data.cluster, "cat %s" %
248
                                          constants.CLUSTER_CONF_FILE)
249

    
250
      if result.failed:
251
        raise errors.RemoteError("Unable to retrieve remote config on %s."
252
                                 " Fail reason: %s; output %s" %
253
                                 (data.cluster, result.fail_reason,
254
                                  result.output))
255

    
256
      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
257
                                        data.cluster)
258
      utils.WriteFile(data.config_path, data=result.stdout)
259

    
260
  # R0201: Method could be a function
261
  def _KillMasterDaemon(self): # pylint: disable-msg=R0201
262
    """Kills the local master daemon.
263

    
264
    @raise errors.CommandError: If unable to kill
265

    
266
    """
267
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
268
    if result.failed:
269
      raise errors.CommandError("Unable to stop master daemons."
270
                                " Fail reason: %s; output: %s" %
271
                                (result.fail_reason, result.output))
272

    
273
  def _MergeConfig(self):
274
    """Merges all foreign config into our own config.
275

    
276
    """
277
    my_config = config.ConfigWriter(offline=True)
278
    fake_ec_id = 0 # Needs to be uniq over the whole config merge
279

    
280
    for data in self.merger_data:
281
      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
282
      self._MergeNodeGroups(my_config, other_config)
283

    
284
      for node in other_config.GetNodeList():
285
        node_info = other_config.GetNodeInfo(node)
286
        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
287
        fake_ec_id += 1
288

    
289
      for instance in other_config.GetInstanceList():
290
        instance_info = other_config.GetInstanceInfo(instance)
291

    
292
        # Update the DRBD port assignments
293
        # This is a little bit hackish
294
        for dsk in instance_info.disks:
295
          if dsk.dev_type in constants.LDS_DRBD:
296
            port = my_config.AllocatePort()
297

    
298
            logical_id = list(dsk.logical_id)
299
            logical_id[2] = port
300
            dsk.logical_id = tuple(logical_id)
301

    
302
            physical_id = list(dsk.physical_id)
303
            physical_id[1] = physical_id[3] = port
304
            dsk.physical_id = tuple(physical_id)
305

    
306
        my_config.AddInstance(instance_info,
307
                              _CLUSTERMERGE_ECID + str(fake_ec_id))
308
        fake_ec_id += 1
309

    
310
  # R0201: Method could be a function
311
  def _MergeNodeGroups(self, my_config, other_config):
312
    """Adds foreign node groups
313

    
314
    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
315
    """
316
    # pylint: disable-msg=R0201
317
    logging.info("Node group conflict strategy: %s" % self.groups)
318

    
319
    my_grps = my_config.GetAllNodeGroupsInfo().values()
320
    other_grps = other_config.GetAllNodeGroupsInfo().values()
321

    
322
    # Check for node group naming conflicts:
323
    conflicts = []
324
    for other_grp in other_grps:
325
      for my_grp in my_grps:
326
        if other_grp.name == my_grp.name:
327
          conflicts.append(other_grp)
328

    
329
    if conflicts:
330
      conflict_names = utils.CommaJoin([g.name for g in conflicts])
331
      logging.info("Node groups in both local and remote cluster: %s" %
332
                   conflict_names)
333

    
334
      # User hasn't specified how to handle conflicts
335
      if not self.groups:
336
        raise errors.CommandError("The following node group(s) are in both"
337
                                  " clusters, and no merge strategy has been"
338
                                  " supplied (see the --groups option): %s" %
339
                                  conflict_names)
340

    
341
      # User wants to rename conflicts
342
      elif self.groups == _GROUPS_RENAME:
343
        for grp in conflicts:
344
          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
345
          logging.info("Renaming remote node group from %s to %s"
346
                       " to resolve conflict" % (grp.name, new_name))
347
          grp.name = new_name
348

    
349
      # User wants to merge conflicting groups
350
      elif self.groups == 'merge':
351
        for other_grp in conflicts:
352
          logging.info("Merging local and remote '%s' groups" % other_grp.name)
353
          for node_name in other_grp.members[:]:
354
            node = other_config.GetNodeInfo(node_name)
355
            other_config._UnlockedRemoveNodeFromGroup(node)
356

    
357
            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
358
            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
359
            node.group = my_grp_uuid
360
          # Remove from list of groups to add
361
          other_grps.remove(other_grp)
362

    
363
    for grp in other_grps:
364
      #TODO: handle node group conflicts
365
      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
366

    
367
  # R0201: Method could be a function
368
  def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
369
    """Starts the local master daemon.
370

    
371
    @param no_vote: Should the masterd started without voting? default: False
372
    @raise errors.CommandError: If unable to start daemon.
373

    
374
    """
375
    env = {}
376
    if no_vote:
377
      env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
378

    
379
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
380
    if result.failed:
381
      raise errors.CommandError("Couldn't start ganeti master."
382
                                " Fail reason: %s; output: %s" %
383
                                (result.fail_reason, result.output))
384

    
385
  def _ReaddMergedNodesAndRedist(self):
386
    """Readds all merging nodes and make sure their config is up-to-date.
387

    
388
    @raise errors.CommandError: If anything fails.
389

    
390
    """
391
    for data in self.merger_data:
392
      for node in data.nodes:
393
        result = utils.RunCmd(["gnt-node", "add", "--readd",
394
                               "--no-ssh-key-check", "--force-join", node])
395
        if result.failed:
396
          raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
397
                                    " output: %s" % (node, result.fail_reason,
398
                                                     result.output))
399

    
400
    result = utils.RunCmd(["gnt-cluster", "redist-conf"])
401
    if result.failed:
402
      raise errors.CommandError("Redistribution failed. Fail reason: %s;"
403
                                " output: %s" % (result.fail_reason,
404
                                                result.output))
405

    
406
  # R0201: Method could be a function
407
  def _StartupAllInstances(self): # pylint: disable-msg=R0201
408
    """Starts up all instances (locally).
409

    
410
    @raise errors.CommandError: If unable to start clusters
411

    
412
    """
413
    result = utils.RunCmd(["gnt-instance", "startup", "--all",
414
                           "--force-multiple"])
415
    if result.failed:
416
      raise errors.CommandError("Unable to start all instances."
417
                                " Fail reason: %s; output: %s" %
418
                                (result.fail_reason, result.output))
419

    
420
  # R0201: Method could be a function
421
  def _VerifyCluster(self): # pylint: disable-msg=R0201
422
    """Runs gnt-cluster verify to verify the health.
423

    
424
    @raise errors.ProgrammError: If cluster fails on verification
425

    
426
    """
427
    result = utils.RunCmd(["gnt-cluster", "verify"])
428
    if result.failed:
429
      raise errors.CommandError("Verification of cluster failed."
430
                                " Fail reason: %s; output: %s" %
431
                                (result.fail_reason, result.output))
432

    
433
  def Merge(self):
434
    """Does the actual merge.
435

    
436
    It runs all the steps in the right order and updates the user about steps
437
    taken. Also it keeps track of rollback_steps to undo everything.
438

    
439
    """
440
    rbsteps = []
441
    try:
442
      logging.info("Pre cluster verification")
443
      self._VerifyCluster()
444

    
445
      logging.info("Prepare authorized_keys")
446
      rbsteps.append("Remove our key from authorized_keys on nodes:"
447
                     " %(nodes)s")
448
      self._PrepareAuthorizedKeys()
449

    
450
      rbsteps.append("Start all instances again on the merging"
451
                     " clusters: %(clusters)s")
452
      logging.info("Stopping merging instances (takes a while)")
453
      self._StopMergingInstances()
454

    
455
      logging.info("Disable watcher")
456
      self._DisableWatcher()
457
      logging.info("Stop daemons on merging nodes")
458
      self._StopDaemons()
459
      logging.info("Merging config")
460
      self._FetchRemoteConfig()
461

    
462
      logging.info("Stopping master daemon")
463
      self._KillMasterDaemon()
464

    
465
      rbsteps.append("Restore %s from another master candidate"
466
                     " and restart master daemon" %
467
                     constants.CLUSTER_CONF_FILE)
468
      self._MergeConfig()
469
      self._StartMasterDaemon(no_vote=True)
470

    
471
      # Point of no return, delete rbsteps
472
      del rbsteps[:]
473

    
474
      logging.warning("We are at the point of no return. Merge can not easily"
475
                      " be undone after this point.")
476
      logging.info("Readd nodes")
477
      self._ReaddMergedNodesAndRedist()
478

    
479
      logging.info("Merge done, restart master daemon normally")
480
      self._KillMasterDaemon()
481
      self._StartMasterDaemon()
482

    
483
      logging.info("Starting instances again")
484
      self._StartupAllInstances()
485
      logging.info("Post cluster verification")
486
      self._VerifyCluster()
487
    except errors.GenericError, e:
488
      logging.exception(e)
489

    
490
      if rbsteps:
491
        nodes = Flatten([data.nodes for data in self.merger_data])
492
        info = {
493
          "clusters": self.clusters,
494
          "nodes": nodes,
495
          }
496
        logging.critical("In order to rollback do the following:")
497
        for step in rbsteps:
498
          logging.critical("  * %s", step % info)
499
      else:
500
        logging.critical("Nothing to rollback.")
501

    
502
      # TODO: Keep track of steps done for a flawless resume?
503

    
504
  def Cleanup(self):
505
    """Clean up our environment.
506

    
507
    This cleans up remote private keys and configs and after that
508
    deletes the temporary directory.
509

    
510
    """
511
    shutil.rmtree(self.work_dir)
512

    
513

    
514
def SetupLogging(options):
515
  """Setting up logging infrastructure.
516

    
517
  @param options: Parsed command line options
518

    
519
  """
520
  formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
521

    
522
  stderr_handler = logging.StreamHandler()
523
  stderr_handler.setFormatter(formatter)
524
  if options.debug:
525
    stderr_handler.setLevel(logging.NOTSET)
526
  elif options.verbose:
527
    stderr_handler.setLevel(logging.INFO)
528
  else:
529
    stderr_handler.setLevel(logging.ERROR)
530

    
531
  root_logger = logging.getLogger("")
532
  root_logger.setLevel(logging.NOTSET)
533
  root_logger.addHandler(stderr_handler)
534

    
535

    
536
def main():
537
  """Main routine.
538

    
539
  """
540
  program = os.path.basename(sys.argv[0])
541

    
542
  parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
543
                                        " [--watcher-pause-period SECONDS]"
544
                                        " [--groups [%s|%s]]"
545
                                        " <cluster> [<cluster...>]" %
546
                                        (_GROUPS_MERGE, _GROUPS_RENAME)),
547
                                        prog=program)
548
  parser.add_option(cli.DEBUG_OPT)
549
  parser.add_option(cli.VERBOSE_OPT)
550
  parser.add_option(PAUSE_PERIOD_OPT)
551
  parser.add_option(GROUPS_OPT)
552

    
553
  (options, args) = parser.parse_args()
554

    
555
  SetupLogging(options)
556

    
557
  if not args:
558
    parser.error("No clusters specified")
559

    
560
  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
561
                          options.groups)
562
  try:
563
    try:
564
      cluster_merger.Setup()
565
      cluster_merger.Merge()
566
    except errors.GenericError, e:
567
      logging.exception(e)
568
      return constants.EXIT_FAILURE
569
  finally:
570
    cluster_merger.Cleanup()
571

    
572
  return constants.EXIT_SUCCESS
573

    
574

    
575
if __name__ == "__main__":
576
  sys.exit(main())