Statistics
| Branch: | Tag: | Revision:

root / lib / bootstrap.py @ 4b97f902

History | View | Annotate | Download (27.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions to bootstrap a new cluster.
23

24
"""
25

    
26
import os
27
import os.path
28
import re
29
import logging
30
import time
31

    
32
from ganeti import rpc
33
from ganeti import ssh
34
from ganeti import utils
35
from ganeti import errors
36
from ganeti import config
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import ssconf
40
from ganeti import serializer
41
from ganeti import hypervisor
42
from ganeti import bdev
43
from ganeti import netutils
44
from ganeti import backend
45
from ganeti import luxi
46
from ganeti import jstore
47

    
48

    
49
# ec_id for InitConfig's temporary reservation manager
50
_INITCONF_ECID = "initconfig-ecid"
51

    
52
#: After how many seconds daemon must be responsive
53
_DAEMON_READY_TIMEOUT = 10.0
54

    
55

    
56
def _InitSSHSetup():
57
  """Setup the SSH configuration for the cluster.
58

59
  This generates a dsa keypair for root, adds the pub key to the
60
  permitted hosts and adds the hostkey to its own known hosts.
61

62
  """
63
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
64

    
65
  for name in priv_key, pub_key:
66
    if os.path.exists(name):
67
      utils.CreateBackup(name)
68
    utils.RemoveFile(name)
69

    
70
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
71
                         "-f", priv_key,
72
                         "-q", "-N", ""])
73
  if result.failed:
74
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
75
                             result.output)
76

    
77
  utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
78

    
79

    
80
def GenerateHmacKey(file_name):
81
  """Writes a new HMAC key.
82

83
  @type file_name: str
84
  @param file_name: Path to output file
85

86
  """
87
  utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
88
                  backup=True)
89

    
90

    
91
def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_confd_hmac_key,
92
                          new_cds, rapi_cert_pem=None, cds=None,
93
                          nodecert_file=constants.NODED_CERT_FILE,
94
                          rapicert_file=constants.RAPI_CERT_FILE,
95
                          hmackey_file=constants.CONFD_HMAC_KEY,
96
                          cds_file=constants.CLUSTER_DOMAIN_SECRET_FILE):
97
  """Updates the cluster certificates, keys and secrets.
98

99
  @type new_cluster_cert: bool
100
  @param new_cluster_cert: Whether to generate a new cluster certificate
101
  @type new_rapi_cert: bool
102
  @param new_rapi_cert: Whether to generate a new RAPI certificate
103
  @type new_confd_hmac_key: bool
104
  @param new_confd_hmac_key: Whether to generate a new HMAC key
105
  @type new_cds: bool
106
  @param new_cds: Whether to generate a new cluster domain secret
107
  @type rapi_cert_pem: string
108
  @param rapi_cert_pem: New RAPI certificate in PEM format
109
  @type cds: string
110
  @param cds: New cluster domain secret
111
  @type nodecert_file: string
112
  @param nodecert_file: optional override of the node cert file path
113
  @type rapicert_file: string
114
  @param rapicert_file: optional override of the rapi cert file path
115
  @type hmackey_file: string
116
  @param hmackey_file: optional override of the hmac key file path
117

118
  """
119
  # noded SSL certificate
120
  cluster_cert_exists = os.path.exists(nodecert_file)
121
  if new_cluster_cert or not cluster_cert_exists:
122
    if cluster_cert_exists:
123
      utils.CreateBackup(nodecert_file)
124

    
125
    logging.debug("Generating new cluster certificate at %s", nodecert_file)
126
    utils.GenerateSelfSignedSslCert(nodecert_file)
127

    
128
  # confd HMAC key
129
  if new_confd_hmac_key or not os.path.exists(hmackey_file):
130
    logging.debug("Writing new confd HMAC key to %s", hmackey_file)
131
    GenerateHmacKey(hmackey_file)
132

    
133
  # RAPI
134
  rapi_cert_exists = os.path.exists(rapicert_file)
135

    
136
  if rapi_cert_pem:
137
    # Assume rapi_pem contains a valid PEM-formatted certificate and key
138
    logging.debug("Writing RAPI certificate at %s", rapicert_file)
139
    utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True)
140

    
141
  elif new_rapi_cert or not rapi_cert_exists:
142
    if rapi_cert_exists:
143
      utils.CreateBackup(rapicert_file)
144

    
145
    logging.debug("Generating new RAPI certificate at %s", rapicert_file)
146
    utils.GenerateSelfSignedSslCert(rapicert_file)
147

    
148
  # Cluster domain secret
149
  if cds:
150
    logging.debug("Writing cluster domain secret to %s", cds_file)
151
    utils.WriteFile(cds_file, data=cds, backup=True)
152

    
153
  elif new_cds or not os.path.exists(cds_file):
154
    logging.debug("Generating new cluster domain secret at %s", cds_file)
155
    GenerateHmacKey(cds_file)
156

    
157

    
158
def _InitGanetiServerSetup(master_name):
159
  """Setup the necessary configuration for the initial node daemon.
160

161
  This creates the nodepass file containing the shared password for
162
  the cluster, generates the SSL certificate and starts the node daemon.
163

164
  @type master_name: str
165
  @param master_name: Name of the master node
166

167
  """
168
  # Generate cluster secrets
169
  GenerateClusterCrypto(True, False, False, False)
170

    
171
  result = utils.RunCmd([constants.DAEMON_UTIL, "start", constants.NODED])
172
  if result.failed:
173
    raise errors.OpExecError("Could not start the node daemon, command %s"
174
                             " had exitcode %s and error %s" %
175
                             (result.cmd, result.exit_code, result.output))
176

    
177
  _WaitForNodeDaemon(master_name)
178

    
179

    
180
def _WaitForNodeDaemon(node_name):
181
  """Wait for node daemon to become responsive.
182

183
  """
184
  def _CheckNodeDaemon():
185
    result = rpc.RpcRunner.call_version([node_name])[node_name]
186
    if result.fail_msg:
187
      raise utils.RetryAgain()
188

    
189
  try:
190
    utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
191
  except utils.RetryTimeout:
192
    raise errors.OpExecError("Node daemon on %s didn't answer queries within"
193
                             " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
194

    
195

    
196
def _WaitForMasterDaemon():
197
  """Wait for master daemon to become responsive.
198

199
  """
200
  def _CheckMasterDaemon():
201
    try:
202
      cl = luxi.Client()
203
      (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
204
    except Exception:
205
      raise utils.RetryAgain()
206

    
207
    logging.debug("Received cluster name %s from master", cluster_name)
208

    
209
  try:
210
    utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
211
  except utils.RetryTimeout:
212
    raise errors.OpExecError("Master daemon didn't answer queries within"
213
                             " %s seconds" % _DAEMON_READY_TIMEOUT)
214

    
215

    
216
def _InitFileStorage(file_storage_dir):
217
  """Initialize if needed the file storage.
218

219
  @param file_storage_dir: the user-supplied value
220
  @return: either empty string (if file storage was disabled at build
221
      time) or the normalized path to the storage directory
222

223
  """
224
  if not constants.ENABLE_FILE_STORAGE:
225
    return ""
226

    
227
  file_storage_dir = os.path.normpath(file_storage_dir)
228

    
229
  if not os.path.isabs(file_storage_dir):
230
    raise errors.OpPrereqError("The file storage directory you passed is"
231
                               " not an absolute path.", errors.ECODE_INVAL)
232

    
233
  if not os.path.exists(file_storage_dir):
234
    try:
235
      os.makedirs(file_storage_dir, 0750)
236
    except OSError, err:
237
      raise errors.OpPrereqError("Cannot create file storage directory"
238
                                 " '%s': %s" % (file_storage_dir, err),
239
                                 errors.ECODE_ENVIRON)
240

    
241
  if not os.path.isdir(file_storage_dir):
242
    raise errors.OpPrereqError("The file storage directory '%s' is not"
243
                               " a directory." % file_storage_dir,
244
                               errors.ECODE_ENVIRON)
245
  return file_storage_dir
246

    
247

    
248
def InitCluster(cluster_name, mac_prefix, # pylint: disable-msg=R0913
249
                master_netdev, file_storage_dir, candidate_pool_size,
250
                secondary_ip=None, vg_name=None, beparams=None,
251
                nicparams=None, ndparams=None, hvparams=None,
252
                enabled_hypervisors=None, modify_etc_hosts=True,
253
                modify_ssh_setup=True, maintain_node_health=False,
254
                drbd_helper=None, uid_pool=None, default_iallocator=None,
255
                primary_ip_version=None, prealloc_wipe_disks=False):
256
  """Initialise the cluster.
257

258
  @type candidate_pool_size: int
259
  @param candidate_pool_size: master candidate pool size
260

261
  """
262
  # TODO: complete the docstring
263
  if config.ConfigWriter.IsCluster():
264
    raise errors.OpPrereqError("Cluster is already initialised",
265
                               errors.ECODE_STATE)
266

    
267
  if not enabled_hypervisors:
268
    raise errors.OpPrereqError("Enabled hypervisors list must contain at"
269
                               " least one member", errors.ECODE_INVAL)
270
  invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
271
  if invalid_hvs:
272
    raise errors.OpPrereqError("Enabled hypervisors contains invalid"
273
                               " entries: %s" % invalid_hvs,
274
                               errors.ECODE_INVAL)
275

    
276

    
277
  ipcls = None
278
  if primary_ip_version == constants.IP4_VERSION:
279
    ipcls = netutils.IP4Address
280
  elif primary_ip_version == constants.IP6_VERSION:
281
    ipcls = netutils.IP6Address
282
  else:
283
    raise errors.OpPrereqError("Invalid primary ip version: %d." %
284
                               primary_ip_version)
285

    
286
  hostname = netutils.GetHostname(family=ipcls.family)
287
  if not ipcls.IsValid(hostname.ip):
288
    raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
289
                               " address." % (hostname.ip, primary_ip_version))
290

    
291
  if ipcls.IsLoopback(hostname.ip):
292
    raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
293
                               " address. Please fix DNS or %s." %
294
                               (hostname.ip, constants.ETC_HOSTS),
295
                               errors.ECODE_ENVIRON)
296

    
297
  if not ipcls.Own(hostname.ip):
298
    raise errors.OpPrereqError("Inconsistency: this host's name resolves"
299
                               " to %s,\nbut this ip address does not"
300
                               " belong to this host" %
301
                               hostname.ip, errors.ECODE_ENVIRON)
302

    
303
  clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
304

    
305
  if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
306
    raise errors.OpPrereqError("Cluster IP already active",
307
                               errors.ECODE_NOTUNIQUE)
308

    
309
  if not secondary_ip:
310
    if primary_ip_version == constants.IP6_VERSION:
311
      raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
312
                                 " IPv4 address must be given as secondary",
313
                                 errors.ECODE_INVAL)
314
    secondary_ip = hostname.ip
315

    
316
  if not netutils.IP4Address.IsValid(secondary_ip):
317
    raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
318
                               " IPv4 address." % secondary_ip,
319
                               errors.ECODE_INVAL)
320

    
321
  if not netutils.IP4Address.Own(secondary_ip):
322
    raise errors.OpPrereqError("You gave %s as secondary IP,"
323
                               " but it does not belong to this host." %
324
                               secondary_ip, errors.ECODE_ENVIRON)
325

    
326
  if vg_name is not None:
327
    # Check if volume group is valid
328
    vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
329
                                          constants.MIN_VG_SIZE)
330
    if vgstatus:
331
      raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
332
                                 " you are not using lvm" % vgstatus,
333
                                 errors.ECODE_INVAL)
334

    
335
  if drbd_helper is not None:
336
    try:
337
      curr_helper = bdev.BaseDRBD.GetUsermodeHelper()
338
    except errors.BlockDeviceError, err:
339
      raise errors.OpPrereqError("Error while checking drbd helper"
340
                                 " (specify --no-drbd-storage if you are not"
341
                                 " using drbd): %s" % str(err),
342
                                 errors.ECODE_ENVIRON)
343
    if drbd_helper != curr_helper:
344
      raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s"
345
                                 " is the current helper" % (drbd_helper,
346
                                                             curr_helper),
347
                                 errors.ECODE_INVAL)
348

    
349
  file_storage_dir = _InitFileStorage(file_storage_dir)
350

    
351
  if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
352
    raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
353
                               errors.ECODE_INVAL)
354

    
355
  result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
356
  if result.failed:
357
    raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
358
                               (master_netdev,
359
                                result.output.strip()), errors.ECODE_INVAL)
360

    
361
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE)]
362
  utils.EnsureDirs(dirs)
363

    
364
  utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
365
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
366
  objects.NIC.CheckParameterSyntax(nicparams)
367

    
368
  if ndparams is not None:
369
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
370
  else:
371
    ndparams = dict(constants.NDC_DEFAULTS)
372

    
373
  # hvparams is a mapping of hypervisor->hvparams dict
374
  for hv_name, hv_params in hvparams.iteritems():
375
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
376
    hv_class = hypervisor.GetHypervisor(hv_name)
377
    hv_class.CheckParameterSyntax(hv_params)
378

    
379
  # set up ssh config and /etc/hosts
380
  sshline = utils.ReadFile(constants.SSH_HOST_RSA_PUB)
381
  sshkey = sshline.split(" ")[1]
382

    
383
  if modify_etc_hosts:
384
    utils.AddHostToEtcHosts(hostname.name, hostname.ip)
385

    
386
  if modify_ssh_setup:
387
    _InitSSHSetup()
388

    
389
  if default_iallocator is not None:
390
    alloc_script = utils.FindFile(default_iallocator,
391
                                  constants.IALLOCATOR_SEARCH_PATH,
392
                                  os.path.isfile)
393
    if alloc_script is None:
394
      raise errors.OpPrereqError("Invalid default iallocator script '%s'"
395
                                 " specified" % default_iallocator,
396
                                 errors.ECODE_INVAL)
397

    
398
  now = time.time()
399

    
400
  # init of cluster config file
401
  cluster_config = objects.Cluster(
402
    serial_no=1,
403
    rsahostkeypub=sshkey,
404
    highest_used_port=(constants.FIRST_DRBD_PORT - 1),
405
    mac_prefix=mac_prefix,
406
    volume_group_name=vg_name,
407
    tcpudp_port_pool=set(),
408
    master_node=hostname.name,
409
    master_ip=clustername.ip,
410
    master_netdev=master_netdev,
411
    cluster_name=clustername.name,
412
    file_storage_dir=file_storage_dir,
413
    shared_file_storage_dir=shared_file_storage_dir,
414
    enabled_hypervisors=enabled_hypervisors,
415
    beparams={constants.PP_DEFAULT: beparams},
416
    nicparams={constants.PP_DEFAULT: nicparams},
417
    ndparams=ndparams,
418
    hvparams=hvparams,
419
    candidate_pool_size=candidate_pool_size,
420
    modify_etc_hosts=modify_etc_hosts,
421
    modify_ssh_setup=modify_ssh_setup,
422
    uid_pool=uid_pool,
423
    ctime=now,
424
    mtime=now,
425
    maintain_node_health=maintain_node_health,
426
    drbd_usermode_helper=drbd_helper,
427
    default_iallocator=default_iallocator,
428
    primary_ip_family=ipcls.family,
429
    prealloc_wipe_disks=prealloc_wipe_disks,
430
    )
431
  master_node_config = objects.Node(name=hostname.name,
432
                                    primary_ip=hostname.ip,
433
                                    secondary_ip=secondary_ip,
434
                                    serial_no=1,
435
                                    master_candidate=True,
436
                                    offline=False, drained=False,
437
                                    ctime=now, mtime=now,
438
                                    )
439
  InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
440
  cfg = config.ConfigWriter(offline=True)
441
  ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
442
  cfg.Update(cfg.GetClusterInfo(), logging.error)
443
  backend.WriteSsconfFiles(cfg.GetSsconfValues())
444

    
445
  # set up the inter-node password and certificate
446
  _InitGanetiServerSetup(hostname.name)
447

    
448
  logging.debug("Starting daemons")
449
  result = utils.RunCmd([constants.DAEMON_UTIL, "start-all"])
450
  if result.failed:
451
    raise errors.OpExecError("Could not start daemons, command %s"
452
                             " had exitcode %s and error %s" %
453
                             (result.cmd, result.exit_code, result.output))
454

    
455
  _WaitForMasterDaemon()
456

    
457

    
458
def InitConfig(version, cluster_config, master_node_config,
459
               cfg_file=constants.CLUSTER_CONF_FILE):
460
  """Create the initial cluster configuration.
461

462
  It will contain the current node, which will also be the master
463
  node, and no instances.
464

465
  @type version: int
466
  @param version: configuration version
467
  @type cluster_config: L{objects.Cluster}
468
  @param cluster_config: cluster configuration
469
  @type master_node_config: L{objects.Node}
470
  @param master_node_config: master node configuration
471
  @type cfg_file: string
472
  @param cfg_file: configuration file path
473

474
  """
475
  uuid_generator = config.TemporaryReservationManager()
476
  cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
477
                                                _INITCONF_ECID)
478
  master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
479
                                                    _INITCONF_ECID)
480
  nodes = {
481
    master_node_config.name: master_node_config,
482
    }
483
  default_nodegroup = objects.NodeGroup(
484
    uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
485
    name=constants.INITIAL_NODE_GROUP_NAME,
486
    members=[master_node_config.name],
487
    )
488
  nodegroups = {
489
    default_nodegroup.uuid: default_nodegroup,
490
    }
491
  now = time.time()
492
  config_data = objects.ConfigData(version=version,
493
                                   cluster=cluster_config,
494
                                   nodegroups=nodegroups,
495
                                   nodes=nodes,
496
                                   instances={},
497
                                   serial_no=1,
498
                                   ctime=now, mtime=now)
499
  utils.WriteFile(cfg_file,
500
                  data=serializer.Dump(config_data.ToDict()),
501
                  mode=0600)
502

    
503

    
504
def FinalizeClusterDestroy(master):
505
  """Execute the last steps of cluster destroy
506

507
  This function shuts down all the daemons, completing the destroy
508
  begun in cmdlib.LUDestroyOpcode.
509

510
  """
511
  cfg = config.ConfigWriter()
512
  modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
513
  result = rpc.RpcRunner.call_node_stop_master(master, True)
514
  msg = result.fail_msg
515
  if msg:
516
    logging.warning("Could not disable the master role: %s", msg)
517
  result = rpc.RpcRunner.call_node_leave_cluster(master, modify_ssh_setup)
518
  msg = result.fail_msg
519
  if msg:
520
    logging.warning("Could not shutdown the node daemon and cleanup"
521
                    " the node: %s", msg)
522

    
523

    
524
def SetupNodeDaemon(cluster_name, node, ssh_key_check):
525
  """Add a node to the cluster.
526

527
  This function must be called before the actual opcode, and will ssh
528
  to the remote node, copy the needed files, and start ganeti-noded,
529
  allowing the master to do the rest via normal rpc calls.
530

531
  @param cluster_name: the cluster name
532
  @param node: the name of the new node
533
  @param ssh_key_check: whether to do a strict key check
534

535
  """
536
  family = ssconf.SimpleStore().GetPrimaryIPFamily()
537
  sshrunner = ssh.SshRunner(cluster_name,
538
                            ipv6=(family == netutils.IP6Address.family))
539

    
540
  bind_address = constants.IP4_ADDRESS_ANY
541
  if family == netutils.IP6Address.family:
542
    bind_address = constants.IP6_ADDRESS_ANY
543

    
544
  # set up inter-node password and certificate and restarts the node daemon
545
  # and then connect with ssh to set password and start ganeti-noded
546
  # note that all the below variables are sanitized at this point,
547
  # either by being constants or by the checks above
548
  sshrunner.CopyFileToNode(node, constants.NODED_CERT_FILE)
549
  sshrunner.CopyFileToNode(node, constants.RAPI_CERT_FILE)
550
  sshrunner.CopyFileToNode(node, constants.CONFD_HMAC_KEY)
551
  mycommand = ("%s stop-all; %s start %s -b %s" %
552
               (constants.DAEMON_UTIL, constants.DAEMON_UTIL, constants.NODED,
553
                utils.ShellQuote(bind_address)))
554

    
555
  result = sshrunner.Run(node, 'root', mycommand, batch=False,
556
                         ask_key=ssh_key_check,
557
                         use_cluster_key=True,
558
                         strict_host_check=ssh_key_check)
559
  if result.failed:
560
    raise errors.OpExecError("Remote command on node %s, error: %s,"
561
                             " output: %s" %
562
                             (node, result.fail_reason, result.output))
563

    
564
  _WaitForNodeDaemon(node)
565

    
566

    
567
def MasterFailover(no_voting=False):
568
  """Failover the master node.
569

570
  This checks that we are not already the master, and will cause the
571
  current master to cease being master, and the non-master to become
572
  new master.
573

574
  @type no_voting: boolean
575
  @param no_voting: force the operation without remote nodes agreement
576
                      (dangerous)
577

578
  """
579
  sstore = ssconf.SimpleStore()
580

    
581
  old_master, new_master = ssconf.GetMasterAndMyself(sstore)
582
  node_list = sstore.GetNodeList()
583
  mc_list = sstore.GetMasterCandidates()
584

    
585
  if old_master == new_master:
586
    raise errors.OpPrereqError("This commands must be run on the node"
587
                               " where you want the new master to be."
588
                               " %s is already the master" %
589
                               old_master, errors.ECODE_INVAL)
590

    
591
  if new_master not in mc_list:
592
    mc_no_master = [name for name in mc_list if name != old_master]
593
    raise errors.OpPrereqError("This node is not among the nodes marked"
594
                               " as master candidates. Only these nodes"
595
                               " can become masters. Current list of"
596
                               " master candidates is:\n"
597
                               "%s" % ('\n'.join(mc_no_master)),
598
                               errors.ECODE_STATE)
599

    
600
  if not no_voting:
601
    vote_list = GatherMasterVotes(node_list)
602

    
603
    if vote_list:
604
      voted_master = vote_list[0][0]
605
      if voted_master is None:
606
        raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
607
                                   " not respond.", errors.ECODE_ENVIRON)
608
      elif voted_master != old_master:
609
        raise errors.OpPrereqError("I have a wrong configuration, I believe"
610
                                   " the master is %s but the other nodes"
611
                                   " voted %s. Please resync the configuration"
612
                                   " of this node." %
613
                                   (old_master, voted_master),
614
                                   errors.ECODE_STATE)
615
  # end checks
616

    
617
  rcode = 0
618

    
619
  logging.info("Setting master to %s, old master: %s", new_master, old_master)
620

    
621
  try:
622
    # instantiate a real config writer, as we now know we have the
623
    # configuration data
624
    cfg = config.ConfigWriter(accept_foreign=True)
625

    
626
    cluster_info = cfg.GetClusterInfo()
627
    cluster_info.master_node = new_master
628
    # this will also regenerate the ssconf files, since we updated the
629
    # cluster info
630
    cfg.Update(cluster_info, logging.error)
631
  except errors.ConfigurationError, err:
632
    logging.error("Error while trying to set the new master: %s",
633
                  str(err))
634
    return 1
635

    
636
  # if cfg.Update worked, then it means the old master daemon won't be
637
  # able now to write its own config file (we rely on locking in both
638
  # backend.UploadFile() and ConfigWriter._Write(); hence the next
639
  # step is to kill the old master
640

    
641
  logging.info("Stopping the master daemon on node %s", old_master)
642

    
643
  result = rpc.RpcRunner.call_node_stop_master(old_master, True)
644
  msg = result.fail_msg
645
  if msg:
646
    logging.error("Could not disable the master role on the old master"
647
                 " %s, please disable manually: %s", old_master, msg)
648

    
649
  logging.info("Checking master IP non-reachability...")
650

    
651
  master_ip = sstore.GetMasterIP()
652
  total_timeout = 30
653
  # Here we have a phase where no master should be running
654
  def _check_ip():
655
    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
656
      raise utils.RetryAgain()
657

    
658
  try:
659
    utils.Retry(_check_ip, (1, 1.5, 5), total_timeout)
660
  except utils.RetryTimeout:
661
    logging.warning("The master IP is still reachable after %s seconds,"
662
                    " continuing but activating the master on the current"
663
                    " node will probably fail", total_timeout)
664

    
665
  if jstore.CheckDrainFlag():
666
    logging.info("Undraining job queue")
667
    jstore.SetDrainFlag(False)
668

    
669
  logging.info("Starting the master daemons on the new master")
670

    
671
  result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
672
  msg = result.fail_msg
673
  if msg:
674
    logging.error("Could not start the master role on the new master"
675
                  " %s, please check: %s", new_master, msg)
676
    rcode = 1
677

    
678
  logging.info("Master failed over from %s to %s", old_master, new_master)
679
  return rcode
680

    
681

    
682
def GetMaster():
683
  """Returns the current master node.
684

685
  This is a separate function in bootstrap since it's needed by
686
  gnt-cluster, and instead of importing directly ssconf, it's better
687
  to abstract it in bootstrap, where we do use ssconf in other
688
  functions too.
689

690
  """
691
  sstore = ssconf.SimpleStore()
692

    
693
  old_master, _ = ssconf.GetMasterAndMyself(sstore)
694

    
695
  return old_master
696

    
697

    
698
def GatherMasterVotes(node_list):
699
  """Check the agreement on who is the master.
700

701
  This function will return a list of (node, number of votes), ordered
702
  by the number of votes. Errors will be denoted by the key 'None'.
703

704
  Note that the sum of votes is the number of nodes this machine
705
  knows, whereas the number of entries in the list could be different
706
  (if some nodes vote for another master).
707

708
  We remove ourselves from the list since we know that (bugs aside)
709
  since we use the same source for configuration information for both
710
  backend and boostrap, we'll always vote for ourselves.
711

712
  @type node_list: list
713
  @param node_list: the list of nodes to query for master info; the current
714
      node will be removed if it is in the list
715
  @rtype: list
716
  @return: list of (node, votes)
717

718
  """
719
  myself = netutils.Hostname.GetSysName()
720
  try:
721
    node_list.remove(myself)
722
  except ValueError:
723
    pass
724
  if not node_list:
725
    # no nodes left (eventually after removing myself)
726
    return []
727
  results = rpc.RpcRunner.call_master_info(node_list)
728
  if not isinstance(results, dict):
729
    # this should not happen (unless internal error in rpc)
730
    logging.critical("Can't complete rpc call, aborting master startup")
731
    return [(None, len(node_list))]
732
  votes = {}
733
  for node in results:
734
    nres = results[node]
735
    data = nres.payload
736
    msg = nres.fail_msg
737
    fail = False
738
    if msg:
739
      logging.warning("Error contacting node %s: %s", node, msg)
740
      fail = True
741
    # for now we accept both length 3 and 4 (data[3] is primary ip version)
742
    elif not isinstance(data, (tuple, list)) or len(data) < 3:
743
      logging.warning("Invalid data received from node %s: %s", node, data)
744
      fail = True
745
    if fail:
746
      if None not in votes:
747
        votes[None] = 0
748
      votes[None] += 1
749
      continue
750
    master_node = data[2]
751
    if master_node not in votes:
752
      votes[master_node] = 0
753
    votes[master_node] += 1
754

    
755
  vote_list = [v for v in votes.items()]
756
  # sort first on number of votes then on name, since we want None
757
  # sorted later if we have the half of the nodes not responding, and
758
  # half voting all for the same master
759
  vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
760

    
761
  return vote_list