Statistics
| Branch: | Tag: | Revision:

root / lib / bootstrap.py @ f2c6673d

History | View | Annotate | Download (27.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions to bootstrap a new cluster.
23

24
"""
25

    
26
import os
27
import os.path
28
import re
29
import logging
30
import time
31

    
32
from ganeti import rpc
33
from ganeti import ssh
34
from ganeti import utils
35
from ganeti import errors
36
from ganeti import config
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import ssconf
40
from ganeti import serializer
41
from ganeti import hypervisor
42
from ganeti import bdev
43
from ganeti import netutils
44
from ganeti import backend
45
from ganeti import luxi
46

    
47

    
48
# ec_id for InitConfig's temporary reservation manager
49
_INITCONF_ECID = "initconfig-ecid"
50

    
51
#: After how many seconds daemon must be responsive
52
_DAEMON_READY_TIMEOUT = 10.0
53

    
54

    
55
def _InitSSHSetup():
56
  """Setup the SSH configuration for the cluster.
57

58
  This generates a dsa keypair for root, adds the pub key to the
59
  permitted hosts and adds the hostkey to its own known hosts.
60

61
  """
62
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
63

    
64
  for name in priv_key, pub_key:
65
    if os.path.exists(name):
66
      utils.CreateBackup(name)
67
    utils.RemoveFile(name)
68

    
69
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
70
                         "-f", priv_key,
71
                         "-q", "-N", ""])
72
  if result.failed:
73
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
74
                             result.output)
75

    
76
  utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
77

    
78

    
79
def GenerateHmacKey(file_name):
80
  """Writes a new HMAC key.
81

82
  @type file_name: str
83
  @param file_name: Path to output file
84

85
  """
86
  utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
87
                  backup=True)
88

    
89

    
90
def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_confd_hmac_key,
91
                          new_cds, rapi_cert_pem=None, cds=None,
92
                          nodecert_file=constants.NODED_CERT_FILE,
93
                          rapicert_file=constants.RAPI_CERT_FILE,
94
                          hmackey_file=constants.CONFD_HMAC_KEY,
95
                          cds_file=constants.CLUSTER_DOMAIN_SECRET_FILE):
96
  """Updates the cluster certificates, keys and secrets.
97

98
  @type new_cluster_cert: bool
99
  @param new_cluster_cert: Whether to generate a new cluster certificate
100
  @type new_rapi_cert: bool
101
  @param new_rapi_cert: Whether to generate a new RAPI certificate
102
  @type new_confd_hmac_key: bool
103
  @param new_confd_hmac_key: Whether to generate a new HMAC key
104
  @type new_cds: bool
105
  @param new_cds: Whether to generate a new cluster domain secret
106
  @type rapi_cert_pem: string
107
  @param rapi_cert_pem: New RAPI certificate in PEM format
108
  @type cds: string
109
  @param cds: New cluster domain secret
110
  @type nodecert_file: string
111
  @param nodecert_file: optional override of the node cert file path
112
  @type rapicert_file: string
113
  @param rapicert_file: optional override of the rapi cert file path
114
  @type hmackey_file: string
115
  @param hmackey_file: optional override of the hmac key file path
116

117
  """
118
  # noded SSL certificate
119
  cluster_cert_exists = os.path.exists(nodecert_file)
120
  if new_cluster_cert or not cluster_cert_exists:
121
    if cluster_cert_exists:
122
      utils.CreateBackup(nodecert_file)
123

    
124
    logging.debug("Generating new cluster certificate at %s", nodecert_file)
125
    utils.GenerateSelfSignedSslCert(nodecert_file)
126

    
127
  # confd HMAC key
128
  if new_confd_hmac_key or not os.path.exists(hmackey_file):
129
    logging.debug("Writing new confd HMAC key to %s", hmackey_file)
130
    GenerateHmacKey(hmackey_file)
131

    
132
  # RAPI
133
  rapi_cert_exists = os.path.exists(rapicert_file)
134

    
135
  if rapi_cert_pem:
136
    # Assume rapi_pem contains a valid PEM-formatted certificate and key
137
    logging.debug("Writing RAPI certificate at %s", rapicert_file)
138
    utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True)
139

    
140
  elif new_rapi_cert or not rapi_cert_exists:
141
    if rapi_cert_exists:
142
      utils.CreateBackup(rapicert_file)
143

    
144
    logging.debug("Generating new RAPI certificate at %s", rapicert_file)
145
    utils.GenerateSelfSignedSslCert(rapicert_file)
146

    
147
  # Cluster domain secret
148
  if cds:
149
    logging.debug("Writing cluster domain secret to %s", cds_file)
150
    utils.WriteFile(cds_file, data=cds, backup=True)
151

    
152
  elif new_cds or not os.path.exists(cds_file):
153
    logging.debug("Generating new cluster domain secret at %s", cds_file)
154
    GenerateHmacKey(cds_file)
155

    
156

    
157
def _InitGanetiServerSetup(master_name):
158
  """Setup the necessary configuration for the initial node daemon.
159

160
  This creates the nodepass file containing the shared password for
161
  the cluster, generates the SSL certificate and starts the node daemon.
162

163
  @type master_name: str
164
  @param master_name: Name of the master node
165

166
  """
167
  # Generate cluster secrets
168
  GenerateClusterCrypto(True, False, False, False)
169

    
170
  result = utils.RunCmd([constants.DAEMON_UTIL, "start", constants.NODED])
171
  if result.failed:
172
    raise errors.OpExecError("Could not start the node daemon, command %s"
173
                             " had exitcode %s and error %s" %
174
                             (result.cmd, result.exit_code, result.output))
175

    
176
  _WaitForNodeDaemon(master_name)
177

    
178

    
179
def _WaitForNodeDaemon(node_name):
180
  """Wait for node daemon to become responsive.
181

182
  """
183
  def _CheckNodeDaemon():
184
    result = rpc.RpcRunner.call_version([node_name])[node_name]
185
    if result.fail_msg:
186
      raise utils.RetryAgain()
187

    
188
  try:
189
    utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
190
  except utils.RetryTimeout:
191
    raise errors.OpExecError("Node daemon on %s didn't answer queries within"
192
                             " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
193

    
194

    
195
def _WaitForMasterDaemon():
196
  """Wait for master daemon to become responsive.
197

198
  """
199
  def _CheckMasterDaemon():
200
    try:
201
      cl = luxi.Client()
202
      (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
203
    except Exception:
204
      raise utils.RetryAgain()
205

    
206
    logging.debug("Received cluster name %s from master", cluster_name)
207

    
208
  try:
209
    utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
210
  except utils.RetryTimeout:
211
    raise errors.OpExecError("Master daemon didn't answer queries within"
212
                             " %s seconds" % _DAEMON_READY_TIMEOUT)
213

    
214

    
215
def _InitFileStorage(file_storage_dir):
216
  """Initialize if needed the file storage.
217

218
  @param file_storage_dir: the user-supplied value
219
  @return: either empty string (if file storage was disabled at build
220
      time) or the normalized path to the storage directory
221

222
  """
223
  if not constants.ENABLE_FILE_STORAGE:
224
    return ""
225

    
226
  file_storage_dir = os.path.normpath(file_storage_dir)
227

    
228
  if not os.path.isabs(file_storage_dir):
229
    raise errors.OpPrereqError("The file storage directory you passed is"
230
                               " not an absolute path.", errors.ECODE_INVAL)
231

    
232
  if not os.path.exists(file_storage_dir):
233
    try:
234
      os.makedirs(file_storage_dir, 0750)
235
    except OSError, err:
236
      raise errors.OpPrereqError("Cannot create file storage directory"
237
                                 " '%s': %s" % (file_storage_dir, err),
238
                                 errors.ECODE_ENVIRON)
239

    
240
  if not os.path.isdir(file_storage_dir):
241
    raise errors.OpPrereqError("The file storage directory '%s' is not"
242
                               " a directory." % file_storage_dir,
243
                               errors.ECODE_ENVIRON)
244
  return file_storage_dir
245

    
246

    
247
def InitCluster(cluster_name, mac_prefix, # pylint: disable-msg=R0913
248
                master_netdev, file_storage_dir, candidate_pool_size,
249
                secondary_ip=None, vg_name=None, beparams=None,
250
                nicparams=None, ndparams=None, hvparams=None,
251
                enabled_hypervisors=None, modify_etc_hosts=True,
252
                modify_ssh_setup=True, maintain_node_health=False,
253
                drbd_helper=None, uid_pool=None, default_iallocator=None,
254
                primary_ip_version=None, prealloc_wipe_disks=False):
255
  """Initialise the cluster.
256

257
  @type candidate_pool_size: int
258
  @param candidate_pool_size: master candidate pool size
259

260
  """
261
  # TODO: complete the docstring
262
  if config.ConfigWriter.IsCluster():
263
    raise errors.OpPrereqError("Cluster is already initialised",
264
                               errors.ECODE_STATE)
265

    
266
  if not enabled_hypervisors:
267
    raise errors.OpPrereqError("Enabled hypervisors list must contain at"
268
                               " least one member", errors.ECODE_INVAL)
269
  invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
270
  if invalid_hvs:
271
    raise errors.OpPrereqError("Enabled hypervisors contains invalid"
272
                               " entries: %s" % invalid_hvs,
273
                               errors.ECODE_INVAL)
274

    
275

    
276
  ipcls = None
277
  if primary_ip_version == constants.IP4_VERSION:
278
    ipcls = netutils.IP4Address
279
  elif primary_ip_version == constants.IP6_VERSION:
280
    ipcls = netutils.IP6Address
281
  else:
282
    raise errors.OpPrereqError("Invalid primary ip version: %d." %
283
                               primary_ip_version)
284

    
285
  hostname = netutils.GetHostname(family=ipcls.family)
286
  if not ipcls.IsValid(hostname.ip):
287
    raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
288
                               " address." % (hostname.ip, primary_ip_version))
289

    
290
  if ipcls.IsLoopback(hostname.ip):
291
    raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
292
                               " address. Please fix DNS or %s." %
293
                               (hostname.ip, constants.ETC_HOSTS),
294
                               errors.ECODE_ENVIRON)
295

    
296
  if not ipcls.Own(hostname.ip):
297
    raise errors.OpPrereqError("Inconsistency: this host's name resolves"
298
                               " to %s,\nbut this ip address does not"
299
                               " belong to this host" %
300
                               hostname.ip, errors.ECODE_ENVIRON)
301

    
302
  clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
303

    
304
  if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
305
    raise errors.OpPrereqError("Cluster IP already active",
306
                               errors.ECODE_NOTUNIQUE)
307

    
308
  if not secondary_ip:
309
    if primary_ip_version == constants.IP6_VERSION:
310
      raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
311
                                 " IPv4 address must be given as secondary",
312
                                 errors.ECODE_INVAL)
313
    secondary_ip = hostname.ip
314

    
315
  if not netutils.IP4Address.IsValid(secondary_ip):
316
    raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
317
                               " IPv4 address." % secondary_ip,
318
                               errors.ECODE_INVAL)
319

    
320
  if not netutils.IP4Address.Own(secondary_ip):
321
    raise errors.OpPrereqError("You gave %s as secondary IP,"
322
                               " but it does not belong to this host." %
323
                               secondary_ip, errors.ECODE_ENVIRON)
324

    
325
  if vg_name is not None:
326
    # Check if volume group is valid
327
    vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
328
                                          constants.MIN_VG_SIZE)
329
    if vgstatus:
330
      raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
331
                                 " you are not using lvm" % vgstatus,
332
                                 errors.ECODE_INVAL)
333

    
334
  if drbd_helper is not None:
335
    try:
336
      curr_helper = bdev.BaseDRBD.GetUsermodeHelper()
337
    except errors.BlockDeviceError, err:
338
      raise errors.OpPrereqError("Error while checking drbd helper"
339
                                 " (specify --no-drbd-storage if you are not"
340
                                 " using drbd): %s" % str(err),
341
                                 errors.ECODE_ENVIRON)
342
    if drbd_helper != curr_helper:
343
      raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s"
344
                                 " is the current helper" % (drbd_helper,
345
                                                             curr_helper),
346
                                 errors.ECODE_INVAL)
347

    
348
  file_storage_dir = _InitFileStorage(file_storage_dir)
349

    
350
  if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
351
    raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
352
                               errors.ECODE_INVAL)
353

    
354
  result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
355
  if result.failed:
356
    raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
357
                               (master_netdev,
358
                                result.output.strip()), errors.ECODE_INVAL)
359

    
360
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE)]
361
  utils.EnsureDirs(dirs)
362

    
363
  utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
364
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
365
  objects.NIC.CheckParameterSyntax(nicparams)
366

    
367
  if ndparams is not None:
368
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
369
  else:
370
    ndparams = dict(constants.NDC_DEFAULTS)
371

    
372
  # hvparams is a mapping of hypervisor->hvparams dict
373
  for hv_name, hv_params in hvparams.iteritems():
374
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
375
    hv_class = hypervisor.GetHypervisor(hv_name)
376
    hv_class.CheckParameterSyntax(hv_params)
377

    
378
  # set up ssh config and /etc/hosts
379
  sshline = utils.ReadFile(constants.SSH_HOST_RSA_PUB)
380
  sshkey = sshline.split(" ")[1]
381

    
382
  if modify_etc_hosts:
383
    utils.AddHostToEtcHosts(hostname.name, hostname.ip)
384

    
385
  if modify_ssh_setup:
386
    _InitSSHSetup()
387

    
388
  if default_iallocator is not None:
389
    alloc_script = utils.FindFile(default_iallocator,
390
                                  constants.IALLOCATOR_SEARCH_PATH,
391
                                  os.path.isfile)
392
    if alloc_script is None:
393
      raise errors.OpPrereqError("Invalid default iallocator script '%s'"
394
                                 " specified" % default_iallocator,
395
                                 errors.ECODE_INVAL)
396

    
397
  now = time.time()
398

    
399
  # init of cluster config file
400
  cluster_config = objects.Cluster(
401
    serial_no=1,
402
    rsahostkeypub=sshkey,
403
    highest_used_port=(constants.FIRST_DRBD_PORT - 1),
404
    mac_prefix=mac_prefix,
405
    volume_group_name=vg_name,
406
    tcpudp_port_pool=set(),
407
    master_node=hostname.name,
408
    master_ip=clustername.ip,
409
    master_netdev=master_netdev,
410
    cluster_name=clustername.name,
411
    file_storage_dir=file_storage_dir,
412
    enabled_hypervisors=enabled_hypervisors,
413
    beparams={constants.PP_DEFAULT: beparams},
414
    nicparams={constants.PP_DEFAULT: nicparams},
415
    ndparams=ndparams,
416
    hvparams=hvparams,
417
    candidate_pool_size=candidate_pool_size,
418
    modify_etc_hosts=modify_etc_hosts,
419
    modify_ssh_setup=modify_ssh_setup,
420
    uid_pool=uid_pool,
421
    ctime=now,
422
    mtime=now,
423
    maintain_node_health=maintain_node_health,
424
    drbd_usermode_helper=drbd_helper,
425
    default_iallocator=default_iallocator,
426
    primary_ip_family=ipcls.family,
427
    prealloc_wipe_disks=prealloc_wipe_disks,
428
    )
429
  master_node_config = objects.Node(name=hostname.name,
430
                                    primary_ip=hostname.ip,
431
                                    secondary_ip=secondary_ip,
432
                                    serial_no=1,
433
                                    master_candidate=True,
434
                                    offline=False, drained=False,
435
                                    ctime=now, mtime=now,
436
                                    )
437
  InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
438
  cfg = config.ConfigWriter(offline=True)
439
  ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
440
  cfg.Update(cfg.GetClusterInfo(), logging.error)
441
  backend.WriteSsconfFiles(cfg.GetSsconfValues())
442

    
443
  # set up the inter-node password and certificate
444
  _InitGanetiServerSetup(hostname.name)
445

    
446
  logging.debug("Starting daemons")
447
  result = utils.RunCmd([constants.DAEMON_UTIL, "start-all"])
448
  if result.failed:
449
    raise errors.OpExecError("Could not start daemons, command %s"
450
                             " had exitcode %s and error %s" %
451
                             (result.cmd, result.exit_code, result.output))
452

    
453
  _WaitForMasterDaemon()
454

    
455

    
456
def InitConfig(version, cluster_config, master_node_config,
457
               cfg_file=constants.CLUSTER_CONF_FILE):
458
  """Create the initial cluster configuration.
459

460
  It will contain the current node, which will also be the master
461
  node, and no instances.
462

463
  @type version: int
464
  @param version: configuration version
465
  @type cluster_config: L{objects.Cluster}
466
  @param cluster_config: cluster configuration
467
  @type master_node_config: L{objects.Node}
468
  @param master_node_config: master node configuration
469
  @type cfg_file: string
470
  @param cfg_file: configuration file path
471

472
  """
473
  uuid_generator = config.TemporaryReservationManager()
474
  cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
475
                                                _INITCONF_ECID)
476
  master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
477
                                                    _INITCONF_ECID)
478
  nodes = {
479
    master_node_config.name: master_node_config,
480
    }
481
  default_nodegroup = objects.NodeGroup(
482
    uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
483
    name=constants.INITIAL_NODE_GROUP_NAME,
484
    members=[master_node_config.name],
485
    )
486
  nodegroups = {
487
    default_nodegroup.uuid: default_nodegroup,
488
    }
489
  now = time.time()
490
  config_data = objects.ConfigData(version=version,
491
                                   cluster=cluster_config,
492
                                   nodegroups=nodegroups,
493
                                   nodes=nodes,
494
                                   instances={},
495
                                   serial_no=1,
496
                                   ctime=now, mtime=now)
497
  utils.WriteFile(cfg_file,
498
                  data=serializer.Dump(config_data.ToDict()),
499
                  mode=0600)
500

    
501

    
502
def FinalizeClusterDestroy(master):
503
  """Execute the last steps of cluster destroy
504

505
  This function shuts down all the daemons, completing the destroy
506
  begun in cmdlib.LUDestroyOpcode.
507

508
  """
509
  cfg = config.ConfigWriter()
510
  modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
511
  result = rpc.RpcRunner.call_node_stop_master(master, True)
512
  msg = result.fail_msg
513
  if msg:
514
    logging.warning("Could not disable the master role: %s", msg)
515
  result = rpc.RpcRunner.call_node_leave_cluster(master, modify_ssh_setup)
516
  msg = result.fail_msg
517
  if msg:
518
    logging.warning("Could not shutdown the node daemon and cleanup"
519
                    " the node: %s", msg)
520

    
521

    
522
def SetupNodeDaemon(cluster_name, node, ssh_key_check):
523
  """Add a node to the cluster.
524

525
  This function must be called before the actual opcode, and will ssh
526
  to the remote node, copy the needed files, and start ganeti-noded,
527
  allowing the master to do the rest via normal rpc calls.
528

529
  @param cluster_name: the cluster name
530
  @param node: the name of the new node
531
  @param ssh_key_check: whether to do a strict key check
532

533
  """
534
  family = ssconf.SimpleStore().GetPrimaryIPFamily()
535
  sshrunner = ssh.SshRunner(cluster_name,
536
                            ipv6=(family == netutils.IP6Address.family))
537

    
538
  bind_address = constants.IP4_ADDRESS_ANY
539
  if family == netutils.IP6Address.family:
540
    bind_address = constants.IP6_ADDRESS_ANY
541

    
542
  # set up inter-node password and certificate and restarts the node daemon
543
  # and then connect with ssh to set password and start ganeti-noded
544
  # note that all the below variables are sanitized at this point,
545
  # either by being constants or by the checks above
546
  sshrunner.CopyFileToNode(node, constants.NODED_CERT_FILE)
547
  sshrunner.CopyFileToNode(node, constants.RAPI_CERT_FILE)
548
  sshrunner.CopyFileToNode(node, constants.CONFD_HMAC_KEY)
549
  mycommand = ("%s stop-all; %s start %s -b %s" %
550
               (constants.DAEMON_UTIL, constants.DAEMON_UTIL, constants.NODED,
551
                utils.ShellQuote(bind_address)))
552

    
553
  result = sshrunner.Run(node, 'root', mycommand, batch=False,
554
                         ask_key=ssh_key_check,
555
                         use_cluster_key=True,
556
                         strict_host_check=ssh_key_check)
557
  if result.failed:
558
    raise errors.OpExecError("Remote command on node %s, error: %s,"
559
                             " output: %s" %
560
                             (node, result.fail_reason, result.output))
561

    
562
  _WaitForNodeDaemon(node)
563

    
564

    
565
def MasterFailover(no_voting=False):
566
  """Failover the master node.
567

568
  This checks that we are not already the master, and will cause the
569
  current master to cease being master, and the non-master to become
570
  new master.
571

572
  @type no_voting: boolean
573
  @param no_voting: force the operation without remote nodes agreement
574
                      (dangerous)
575

576
  """
577
  sstore = ssconf.SimpleStore()
578

    
579
  old_master, new_master = ssconf.GetMasterAndMyself(sstore)
580
  node_list = sstore.GetNodeList()
581
  mc_list = sstore.GetMasterCandidates()
582

    
583
  if old_master == new_master:
584
    raise errors.OpPrereqError("This commands must be run on the node"
585
                               " where you want the new master to be."
586
                               " %s is already the master" %
587
                               old_master, errors.ECODE_INVAL)
588

    
589
  if new_master not in mc_list:
590
    mc_no_master = [name for name in mc_list if name != old_master]
591
    raise errors.OpPrereqError("This node is not among the nodes marked"
592
                               " as master candidates. Only these nodes"
593
                               " can become masters. Current list of"
594
                               " master candidates is:\n"
595
                               "%s" % ('\n'.join(mc_no_master)),
596
                               errors.ECODE_STATE)
597

    
598
  if not no_voting:
599
    vote_list = GatherMasterVotes(node_list)
600

    
601
    if vote_list:
602
      voted_master = vote_list[0][0]
603
      if voted_master is None:
604
        raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
605
                                   " not respond.", errors.ECODE_ENVIRON)
606
      elif voted_master != old_master:
607
        raise errors.OpPrereqError("I have a wrong configuration, I believe"
608
                                   " the master is %s but the other nodes"
609
                                   " voted %s. Please resync the configuration"
610
                                   " of this node." %
611
                                   (old_master, voted_master),
612
                                   errors.ECODE_STATE)
613
  # end checks
614

    
615
  rcode = 0
616

    
617
  logging.info("Setting master to %s, old master: %s", new_master, old_master)
618

    
619
  try:
620
    # instantiate a real config writer, as we now know we have the
621
    # configuration data
622
    cfg = config.ConfigWriter(accept_foreign=True)
623

    
624
    cluster_info = cfg.GetClusterInfo()
625
    cluster_info.master_node = new_master
626
    # this will also regenerate the ssconf files, since we updated the
627
    # cluster info
628
    cfg.Update(cluster_info, logging.error)
629
  except errors.ConfigurationError, err:
630
    logging.error("Error while trying to set the new master: %s",
631
                  str(err))
632
    return 1
633

    
634
  # if cfg.Update worked, then it means the old master daemon won't be
635
  # able now to write its own config file (we rely on locking in both
636
  # backend.UploadFile() and ConfigWriter._Write(); hence the next
637
  # step is to kill the old master
638

    
639
  logging.info("Stopping the master daemon on node %s", old_master)
640

    
641
  result = rpc.RpcRunner.call_node_stop_master(old_master, True)
642
  msg = result.fail_msg
643
  if msg:
644
    logging.error("Could not disable the master role on the old master"
645
                 " %s, please disable manually: %s", old_master, msg)
646

    
647
  logging.info("Checking master IP non-reachability...")
648

    
649
  master_ip = sstore.GetMasterIP()
650
  total_timeout = 30
651
  # Here we have a phase where no master should be running
652
  def _check_ip():
653
    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
654
      raise utils.RetryAgain()
655

    
656
  try:
657
    utils.Retry(_check_ip, (1, 1.5, 5), total_timeout)
658
  except utils.RetryTimeout:
659
    logging.warning("The master IP is still reachable after %s seconds,"
660
                    " continuing but activating the master on the current"
661
                    " node will probably fail", total_timeout)
662

    
663
  logging.info("Starting the master daemons on the new master")
664

    
665
  result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
666
  msg = result.fail_msg
667
  if msg:
668
    logging.error("Could not start the master role on the new master"
669
                  " %s, please check: %s", new_master, msg)
670
    rcode = 1
671

    
672
  logging.info("Master failed over from %s to %s", old_master, new_master)
673
  return rcode
674

    
675

    
676
def GetMaster():
677
  """Returns the current master node.
678

679
  This is a separate function in bootstrap since it's needed by
680
  gnt-cluster, and instead of importing directly ssconf, it's better
681
  to abstract it in bootstrap, where we do use ssconf in other
682
  functions too.
683

684
  """
685
  sstore = ssconf.SimpleStore()
686

    
687
  old_master, _ = ssconf.GetMasterAndMyself(sstore)
688

    
689
  return old_master
690

    
691

    
692
def GatherMasterVotes(node_list):
693
  """Check the agreement on who is the master.
694

695
  This function will return a list of (node, number of votes), ordered
696
  by the number of votes. Errors will be denoted by the key 'None'.
697

698
  Note that the sum of votes is the number of nodes this machine
699
  knows, whereas the number of entries in the list could be different
700
  (if some nodes vote for another master).
701

702
  We remove ourselves from the list since we know that (bugs aside)
703
  since we use the same source for configuration information for both
704
  backend and boostrap, we'll always vote for ourselves.
705

706
  @type node_list: list
707
  @param node_list: the list of nodes to query for master info; the current
708
      node will be removed if it is in the list
709
  @rtype: list
710
  @return: list of (node, votes)
711

712
  """
713
  myself = netutils.Hostname.GetSysName()
714
  try:
715
    node_list.remove(myself)
716
  except ValueError:
717
    pass
718
  if not node_list:
719
    # no nodes left (eventually after removing myself)
720
    return []
721
  results = rpc.RpcRunner.call_master_info(node_list)
722
  if not isinstance(results, dict):
723
    # this should not happen (unless internal error in rpc)
724
    logging.critical("Can't complete rpc call, aborting master startup")
725
    return [(None, len(node_list))]
726
  votes = {}
727
  for node in results:
728
    nres = results[node]
729
    data = nres.payload
730
    msg = nres.fail_msg
731
    fail = False
732
    if msg:
733
      logging.warning("Error contacting node %s: %s", node, msg)
734
      fail = True
735
    # for now we accept both length 3 and 4 (data[3] is primary ip version)
736
    elif not isinstance(data, (tuple, list)) or len(data) < 3:
737
      logging.warning("Invalid data received from node %s: %s", node, data)
738
      fail = True
739
    if fail:
740
      if None not in votes:
741
        votes[None] = 0
742
      votes[None] += 1
743
      continue
744
    master_node = data[2]
745
    if master_node not in votes:
746
      votes[master_node] = 0
747
    votes[master_node] += 1
748

    
749
  vote_list = [v for v in votes.items()]
750
  # sort first on number of votes then on name, since we want None
751
  # sorted later if we have the half of the nodes not responding, and
752
  # half voting all for the same master
753
  vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
754

    
755
  return vote_list