Statistics
| Branch: | Tag: | Revision:

root / lib / bootstrap.py @ 3b6b6129

History | View | Annotate | Download (27.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions to bootstrap a new cluster.
23

24
"""
25

    
26
import os
27
import os.path
28
import re
29
import logging
30
import time
31

    
32
from ganeti import rpc
33
from ganeti import ssh
34
from ganeti import utils
35
from ganeti import errors
36
from ganeti import config
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import ssconf
40
from ganeti import serializer
41
from ganeti import hypervisor
42
from ganeti import bdev
43
from ganeti import netutils
44
from ganeti import backend
45
from ganeti import luxi
46

    
47

    
48
# ec_id for InitConfig's temporary reservation manager
49
_INITCONF_ECID = "initconfig-ecid"
50

    
51
#: After how many seconds daemon must be responsive
52
_DAEMON_READY_TIMEOUT = 10.0
53

    
54

    
55
def _InitSSHSetup():
56
  """Setup the SSH configuration for the cluster.
57

58
  This generates a dsa keypair for root, adds the pub key to the
59
  permitted hosts and adds the hostkey to its own known hosts.
60

61
  """
62
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
63

    
64
  for name in priv_key, pub_key:
65
    if os.path.exists(name):
66
      utils.CreateBackup(name)
67
    utils.RemoveFile(name)
68

    
69
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
70
                         "-f", priv_key,
71
                         "-q", "-N", ""])
72
  if result.failed:
73
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
74
                             result.output)
75

    
76
  utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
77

    
78

    
79
def GenerateHmacKey(file_name):
80
  """Writes a new HMAC key.
81

82
  @type file_name: str
83
  @param file_name: Path to output file
84

85
  """
86
  utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
87
                  backup=True)
88

    
89

    
90
def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_confd_hmac_key,
91
                          new_cds, rapi_cert_pem=None, cds=None,
92
                          nodecert_file=constants.NODED_CERT_FILE,
93
                          rapicert_file=constants.RAPI_CERT_FILE,
94
                          hmackey_file=constants.CONFD_HMAC_KEY,
95
                          cds_file=constants.CLUSTER_DOMAIN_SECRET_FILE):
96
  """Updates the cluster certificates, keys and secrets.
97

98
  @type new_cluster_cert: bool
99
  @param new_cluster_cert: Whether to generate a new cluster certificate
100
  @type new_rapi_cert: bool
101
  @param new_rapi_cert: Whether to generate a new RAPI certificate
102
  @type new_confd_hmac_key: bool
103
  @param new_confd_hmac_key: Whether to generate a new HMAC key
104
  @type new_cds: bool
105
  @param new_cds: Whether to generate a new cluster domain secret
106
  @type rapi_cert_pem: string
107
  @param rapi_cert_pem: New RAPI certificate in PEM format
108
  @type cds: string
109
  @param cds: New cluster domain secret
110
  @type nodecert_file: string
111
  @param nodecert_file: optional override of the node cert file path
112
  @type rapicert_file: string
113
  @param rapicert_file: optional override of the rapi cert file path
114
  @type hmackey_file: string
115
  @param hmackey_file: optional override of the hmac key file path
116

117
  """
118
  # noded SSL certificate
119
  cluster_cert_exists = os.path.exists(nodecert_file)
120
  if new_cluster_cert or not cluster_cert_exists:
121
    if cluster_cert_exists:
122
      utils.CreateBackup(nodecert_file)
123

    
124
    logging.debug("Generating new cluster certificate at %s", nodecert_file)
125
    utils.GenerateSelfSignedSslCert(nodecert_file)
126

    
127
  # confd HMAC key
128
  if new_confd_hmac_key or not os.path.exists(hmackey_file):
129
    logging.debug("Writing new confd HMAC key to %s", hmackey_file)
130
    GenerateHmacKey(hmackey_file)
131

    
132
  # RAPI
133
  rapi_cert_exists = os.path.exists(rapicert_file)
134

    
135
  if rapi_cert_pem:
136
    # Assume rapi_pem contains a valid PEM-formatted certificate and key
137
    logging.debug("Writing RAPI certificate at %s", rapicert_file)
138
    utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True)
139

    
140
  elif new_rapi_cert or not rapi_cert_exists:
141
    if rapi_cert_exists:
142
      utils.CreateBackup(rapicert_file)
143

    
144
    logging.debug("Generating new RAPI certificate at %s", rapicert_file)
145
    utils.GenerateSelfSignedSslCert(rapicert_file)
146

    
147
  # Cluster domain secret
148
  if cds:
149
    logging.debug("Writing cluster domain secret to %s", cds_file)
150
    utils.WriteFile(cds_file, data=cds, backup=True)
151

    
152
  elif new_cds or not os.path.exists(cds_file):
153
    logging.debug("Generating new cluster domain secret at %s", cds_file)
154
    GenerateHmacKey(cds_file)
155

    
156

    
157
def _InitGanetiServerSetup(master_name):
158
  """Setup the necessary configuration for the initial node daemon.
159

160
  This creates the nodepass file containing the shared password for
161
  the cluster, generates the SSL certificate and starts the node daemon.
162

163
  @type master_name: str
164
  @param master_name: Name of the master node
165

166
  """
167
  # Generate cluster secrets
168
  GenerateClusterCrypto(True, False, False, False)
169

    
170
  result = utils.RunCmd([constants.DAEMON_UTIL, "start", constants.NODED])
171
  if result.failed:
172
    raise errors.OpExecError("Could not start the node daemon, command %s"
173
                             " had exitcode %s and error %s" %
174
                             (result.cmd, result.exit_code, result.output))
175

    
176
  _WaitForNodeDaemon(master_name)
177

    
178

    
179
def _WaitForNodeDaemon(node_name):
180
  """Wait for node daemon to become responsive.
181

182
  """
183
  def _CheckNodeDaemon():
184
    result = rpc.RpcRunner.call_version([node_name])[node_name]
185
    if result.fail_msg:
186
      raise utils.RetryAgain()
187

    
188
  try:
189
    utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
190
  except utils.RetryTimeout:
191
    raise errors.OpExecError("Node daemon on %s didn't answer queries within"
192
                             " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
193

    
194

    
195
def _WaitForMasterDaemon():
196
  """Wait for master daemon to become responsive.
197

198
  """
199
  def _CheckMasterDaemon():
200
    try:
201
      cl = luxi.Client()
202
      (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
203
    except Exception:
204
      raise utils.RetryAgain()
205

    
206
    logging.debug("Received cluster name %s from master", cluster_name)
207

    
208
  try:
209
    utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
210
  except utils.RetryTimeout:
211
    raise errors.OpExecError("Master daemon didn't answer queries within"
212
                             " %s seconds" % _DAEMON_READY_TIMEOUT)
213

    
214

    
215
def _InitFileStorage(file_storage_dir):
216
  """Initialize if needed the file storage.
217

218
  @param file_storage_dir: the user-supplied value
219
  @return: either empty string (if file storage was disabled at build
220
      time) or the normalized path to the storage directory
221

222
  """
223
  if not constants.ENABLE_FILE_STORAGE:
224
    return ""
225

    
226
  file_storage_dir = os.path.normpath(file_storage_dir)
227

    
228
  if not os.path.isabs(file_storage_dir):
229
    raise errors.OpPrereqError("The file storage directory you passed is"
230
                               " not an absolute path.", errors.ECODE_INVAL)
231

    
232
  if not os.path.exists(file_storage_dir):
233
    try:
234
      os.makedirs(file_storage_dir, 0750)
235
    except OSError, err:
236
      raise errors.OpPrereqError("Cannot create file storage directory"
237
                                 " '%s': %s" % (file_storage_dir, err),
238
                                 errors.ECODE_ENVIRON)
239

    
240
  if not os.path.isdir(file_storage_dir):
241
    raise errors.OpPrereqError("The file storage directory '%s' is not"
242
                               " a directory." % file_storage_dir,
243
                               errors.ECODE_ENVIRON)
244
  return file_storage_dir
245

    
246

    
247
def InitCluster(cluster_name, mac_prefix, # pylint: disable-msg=R0913
248
                master_netdev, file_storage_dir, candidate_pool_size,
249
                secondary_ip=None, vg_name=None, beparams=None,
250
                nicparams=None, hvparams=None, enabled_hypervisors=None,
251
                modify_etc_hosts=True, modify_ssh_setup=True,
252
                maintain_node_health=False, drbd_helper=None,
253
                uid_pool=None, default_iallocator=None,
254
                primary_ip_version=None, prealloc_wipe_disks=False):
255
  """Initialise the cluster.
256

257
  @type candidate_pool_size: int
258
  @param candidate_pool_size: master candidate pool size
259

260
  """
261
  # TODO: complete the docstring
262
  if config.ConfigWriter.IsCluster():
263
    raise errors.OpPrereqError("Cluster is already initialised",
264
                               errors.ECODE_STATE)
265

    
266
  if not enabled_hypervisors:
267
    raise errors.OpPrereqError("Enabled hypervisors list must contain at"
268
                               " least one member", errors.ECODE_INVAL)
269
  invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
270
  if invalid_hvs:
271
    raise errors.OpPrereqError("Enabled hypervisors contains invalid"
272
                               " entries: %s" % invalid_hvs,
273
                               errors.ECODE_INVAL)
274

    
275

    
276
  ipcls = None
277
  if primary_ip_version == constants.IP4_VERSION:
278
    ipcls = netutils.IP4Address
279
  elif primary_ip_version == constants.IP6_VERSION:
280
    ipcls = netutils.IP6Address
281
  else:
282
    raise errors.OpPrereqError("Invalid primary ip version: %d." %
283
                               primary_ip_version)
284

    
285
  hostname = netutils.GetHostname(family=ipcls.family)
286
  if not ipcls.IsValid(hostname.ip):
287
    raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
288
                               " address." % (hostname.ip, primary_ip_version))
289

    
290
  if ipcls.IsLoopback(hostname.ip):
291
    raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
292
                               " address. Please fix DNS or %s." %
293
                               (hostname.ip, constants.ETC_HOSTS),
294
                               errors.ECODE_ENVIRON)
295

    
296
  if not ipcls.Own(hostname.ip):
297
    raise errors.OpPrereqError("Inconsistency: this host's name resolves"
298
                               " to %s,\nbut this ip address does not"
299
                               " belong to this host" %
300
                               hostname.ip, errors.ECODE_ENVIRON)
301

    
302
  clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
303

    
304
  if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
305
    raise errors.OpPrereqError("Cluster IP already active",
306
                               errors.ECODE_NOTUNIQUE)
307

    
308
  if not secondary_ip:
309
    if primary_ip_version == constants.IP6_VERSION:
310
      raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
311
                                 " IPv4 address must be given as secondary",
312
                                 errors.ECODE_INVAL)
313
    secondary_ip = hostname.ip
314

    
315
  if not netutils.IP4Address.IsValid(secondary_ip):
316
    raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
317
                               " IPv4 address." % secondary_ip,
318
                               errors.ECODE_INVAL)
319

    
320
  if not netutils.IP4Address.Own(secondary_ip):
321
    raise errors.OpPrereqError("You gave %s as secondary IP,"
322
                               " but it does not belong to this host." %
323
                               secondary_ip, errors.ECODE_ENVIRON)
324

    
325
  if vg_name is not None:
326
    # Check if volume group is valid
327
    vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
328
                                          constants.MIN_VG_SIZE)
329
    if vgstatus:
330
      raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
331
                                 " you are not using lvm" % vgstatus,
332
                                 errors.ECODE_INVAL)
333

    
334
  if drbd_helper is not None:
335
    try:
336
      curr_helper = bdev.BaseDRBD.GetUsermodeHelper()
337
    except errors.BlockDeviceError, err:
338
      raise errors.OpPrereqError("Error while checking drbd helper"
339
                                 " (specify --no-drbd-storage if you are not"
340
                                 " using drbd): %s" % str(err),
341
                                 errors.ECODE_ENVIRON)
342
    if drbd_helper != curr_helper:
343
      raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s"
344
                                 " is the current helper" % (drbd_helper,
345
                                                             curr_helper),
346
                                 errors.ECODE_INVAL)
347

    
348
  file_storage_dir = _InitFileStorage(file_storage_dir)
349

    
350
  if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
351
    raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
352
                               errors.ECODE_INVAL)
353

    
354
  result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
355
  if result.failed:
356
    raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
357
                               (master_netdev,
358
                                result.output.strip()), errors.ECODE_INVAL)
359

    
360
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE)]
361
  utils.EnsureDirs(dirs)
362

    
363
  utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
364
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
365
  objects.NIC.CheckParameterSyntax(nicparams)
366

    
367
  # hvparams is a mapping of hypervisor->hvparams dict
368
  for hv_name, hv_params in hvparams.iteritems():
369
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
370
    hv_class = hypervisor.GetHypervisor(hv_name)
371
    hv_class.CheckParameterSyntax(hv_params)
372

    
373
  # set up ssh config and /etc/hosts
374
  sshline = utils.ReadFile(constants.SSH_HOST_RSA_PUB)
375
  sshkey = sshline.split(" ")[1]
376

    
377
  if modify_etc_hosts:
378
    utils.AddHostToEtcHosts(hostname.name, hostname.ip)
379

    
380
  if modify_ssh_setup:
381
    _InitSSHSetup()
382

    
383
  if default_iallocator is not None:
384
    alloc_script = utils.FindFile(default_iallocator,
385
                                  constants.IALLOCATOR_SEARCH_PATH,
386
                                  os.path.isfile)
387
    if alloc_script is None:
388
      raise errors.OpPrereqError("Invalid default iallocator script '%s'"
389
                                 " specified" % default_iallocator,
390
                                 errors.ECODE_INVAL)
391

    
392
  now = time.time()
393

    
394
  # init of cluster config file
395
  cluster_config = objects.Cluster(
396
    serial_no=1,
397
    rsahostkeypub=sshkey,
398
    highest_used_port=(constants.FIRST_DRBD_PORT - 1),
399
    mac_prefix=mac_prefix,
400
    volume_group_name=vg_name,
401
    tcpudp_port_pool=set(),
402
    master_node=hostname.name,
403
    master_ip=clustername.ip,
404
    master_netdev=master_netdev,
405
    cluster_name=clustername.name,
406
    file_storage_dir=file_storage_dir,
407
    enabled_hypervisors=enabled_hypervisors,
408
    beparams={constants.PP_DEFAULT: beparams},
409
    nicparams={constants.PP_DEFAULT: nicparams},
410
    hvparams=hvparams,
411
    candidate_pool_size=candidate_pool_size,
412
    modify_etc_hosts=modify_etc_hosts,
413
    modify_ssh_setup=modify_ssh_setup,
414
    uid_pool=uid_pool,
415
    ctime=now,
416
    mtime=now,
417
    maintain_node_health=maintain_node_health,
418
    drbd_usermode_helper=drbd_helper,
419
    default_iallocator=default_iallocator,
420
    primary_ip_family=ipcls.family,
421
    prealloc_wipe_disks=prealloc_wipe_disks,
422
    )
423
  master_node_config = objects.Node(name=hostname.name,
424
                                    primary_ip=hostname.ip,
425
                                    secondary_ip=secondary_ip,
426
                                    serial_no=1,
427
                                    master_candidate=True,
428
                                    offline=False, drained=False,
429
                                    )
430
  InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
431
  cfg = config.ConfigWriter(offline=True)
432
  ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
433
  cfg.Update(cfg.GetClusterInfo(), logging.error)
434
  backend.WriteSsconfFiles(cfg.GetSsconfValues())
435

    
436
  # set up the inter-node password and certificate
437
  _InitGanetiServerSetup(hostname.name)
438

    
439
  logging.debug("Starting daemons")
440
  result = utils.RunCmd([constants.DAEMON_UTIL, "start-all"])
441
  if result.failed:
442
    raise errors.OpExecError("Could not start daemons, command %s"
443
                             " had exitcode %s and error %s" %
444
                             (result.cmd, result.exit_code, result.output))
445

    
446
  _WaitForMasterDaemon()
447

    
448

    
449
def InitConfig(version, cluster_config, master_node_config,
450
               cfg_file=constants.CLUSTER_CONF_FILE):
451
  """Create the initial cluster configuration.
452

453
  It will contain the current node, which will also be the master
454
  node, and no instances.
455

456
  @type version: int
457
  @param version: configuration version
458
  @type cluster_config: L{objects.Cluster}
459
  @param cluster_config: cluster configuration
460
  @type master_node_config: L{objects.Node}
461
  @param master_node_config: master node configuration
462
  @type cfg_file: string
463
  @param cfg_file: configuration file path
464

465
  """
466
  uuid_generator = config.TemporaryReservationManager()
467
  cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
468
                                                _INITCONF_ECID)
469
  master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
470
                                                    _INITCONF_ECID)
471
  nodes = {
472
    master_node_config.name: master_node_config,
473
    }
474
  default_nodegroup = objects.NodeGroup(
475
    uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
476
    name="default",
477
    members=[master_node_config.name],
478
    )
479
  nodegroups = {
480
    default_nodegroup.uuid: default_nodegroup,
481
    }
482
  now = time.time()
483
  config_data = objects.ConfigData(version=version,
484
                                   cluster=cluster_config,
485
                                   nodegroups=nodegroups,
486
                                   nodes=nodes,
487
                                   instances={},
488
                                   serial_no=1,
489
                                   ctime=now, mtime=now)
490
  utils.WriteFile(cfg_file,
491
                  data=serializer.Dump(config_data.ToDict()),
492
                  mode=0600)
493

    
494

    
495
def FinalizeClusterDestroy(master):
496
  """Execute the last steps of cluster destroy
497

498
  This function shuts down all the daemons, completing the destroy
499
  begun in cmdlib.LUDestroyOpcode.
500

501
  """
502
  cfg = config.ConfigWriter()
503
  modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
504
  result = rpc.RpcRunner.call_node_stop_master(master, True)
505
  msg = result.fail_msg
506
  if msg:
507
    logging.warning("Could not disable the master role: %s", msg)
508
  result = rpc.RpcRunner.call_node_leave_cluster(master, modify_ssh_setup)
509
  msg = result.fail_msg
510
  if msg:
511
    logging.warning("Could not shutdown the node daemon and cleanup"
512
                    " the node: %s", msg)
513

    
514

    
515
def SetupNodeDaemon(cluster_name, node, ssh_key_check):
516
  """Add a node to the cluster.
517

518
  This function must be called before the actual opcode, and will ssh
519
  to the remote node, copy the needed files, and start ganeti-noded,
520
  allowing the master to do the rest via normal rpc calls.
521

522
  @param cluster_name: the cluster name
523
  @param node: the name of the new node
524
  @param ssh_key_check: whether to do a strict key check
525

526
  """
527
  family = ssconf.SimpleStore().GetPrimaryIPFamily()
528
  sshrunner = ssh.SshRunner(cluster_name,
529
                            ipv6=family==netutils.IP6Address.family)
530

    
531
  noded_cert = utils.ReadFile(constants.NODED_CERT_FILE)
532
  rapi_cert = utils.ReadFile(constants.RAPI_CERT_FILE)
533
  confd_hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
534

    
535
  # in the base64 pem encoding, neither '!' nor '.' are valid chars,
536
  # so we use this to detect an invalid certificate; as long as the
537
  # cert doesn't contain this, the here-document will be correctly
538
  # parsed by the shell sequence below. HMAC keys are hexadecimal strings,
539
  # so the same restrictions apply.
540
  for content in (noded_cert, rapi_cert, confd_hmac_key):
541
    if re.search('^!EOF\.', content, re.MULTILINE):
542
      raise errors.OpExecError("invalid SSL certificate or HMAC key")
543

    
544
  if not noded_cert.endswith("\n"):
545
    noded_cert += "\n"
546
  if not rapi_cert.endswith("\n"):
547
    rapi_cert += "\n"
548
  if not confd_hmac_key.endswith("\n"):
549
    confd_hmac_key += "\n"
550

    
551
  bind_address = constants.IP4_ADDRESS_ANY
552
  if family == netutils.IP6Address.family:
553
    bind_address = constants.IP6_ADDRESS_ANY
554

    
555
  # set up inter-node password and certificate and restarts the node daemon
556
  # and then connect with ssh to set password and start ganeti-noded
557
  # note that all the below variables are sanitized at this point,
558
  # either by being constants or by the checks above
559
  sshrunner.CopyFileToNode(node, constants.NODED_CERT_FILE)
560
  sshrunner.CopyFileToNode(node, constants.RAPI_CERT_FILE)
561
  sshrunner.CopyFileToNode(node, constants.CONFD_HMAC_KEY)
562
  mycommand = ("%s stop-all; %s start %s -b '%s'" % (constants.DAEMON_UTIL,
563
                                                     constants.DAEMON_UTIL,
564
                                                     constants.NODED,
565
                                                     bind_address))
566

    
567
  result = sshrunner.Run(node, 'root', mycommand, batch=False,
568
                         ask_key=ssh_key_check,
569
                         use_cluster_key=True,
570
                         strict_host_check=ssh_key_check)
571
  if result.failed:
572
    raise errors.OpExecError("Remote command on node %s, error: %s,"
573
                             " output: %s" %
574
                             (node, result.fail_reason, result.output))
575

    
576
  _WaitForNodeDaemon(node)
577

    
578

    
579
def MasterFailover(no_voting=False):
580
  """Failover the master node.
581

582
  This checks that we are not already the master, and will cause the
583
  current master to cease being master, and the non-master to become
584
  new master.
585

586
  @type no_voting: boolean
587
  @param no_voting: force the operation without remote nodes agreement
588
                      (dangerous)
589

590
  """
591
  sstore = ssconf.SimpleStore()
592

    
593
  old_master, new_master = ssconf.GetMasterAndMyself(sstore)
594
  node_list = sstore.GetNodeList()
595
  mc_list = sstore.GetMasterCandidates()
596

    
597
  if old_master == new_master:
598
    raise errors.OpPrereqError("This commands must be run on the node"
599
                               " where you want the new master to be."
600
                               " %s is already the master" %
601
                               old_master, errors.ECODE_INVAL)
602

    
603
  if new_master not in mc_list:
604
    mc_no_master = [name for name in mc_list if name != old_master]
605
    raise errors.OpPrereqError("This node is not among the nodes marked"
606
                               " as master candidates. Only these nodes"
607
                               " can become masters. Current list of"
608
                               " master candidates is:\n"
609
                               "%s" % ('\n'.join(mc_no_master)),
610
                               errors.ECODE_STATE)
611

    
612
  if not no_voting:
613
    vote_list = GatherMasterVotes(node_list)
614

    
615
    if vote_list:
616
      voted_master = vote_list[0][0]
617
      if voted_master is None:
618
        raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
619
                                   " not respond.", errors.ECODE_ENVIRON)
620
      elif voted_master != old_master:
621
        raise errors.OpPrereqError("I have a wrong configuration, I believe"
622
                                   " the master is %s but the other nodes"
623
                                   " voted %s. Please resync the configuration"
624
                                   " of this node." %
625
                                   (old_master, voted_master),
626
                                   errors.ECODE_STATE)
627
  # end checks
628

    
629
  rcode = 0
630

    
631
  logging.info("Setting master to %s, old master: %s", new_master, old_master)
632

    
633
  try:
634
    # instantiate a real config writer, as we now know we have the
635
    # configuration data
636
    cfg = config.ConfigWriter(accept_foreign=True)
637

    
638
    cluster_info = cfg.GetClusterInfo()
639
    cluster_info.master_node = new_master
640
    # this will also regenerate the ssconf files, since we updated the
641
    # cluster info
642
    cfg.Update(cluster_info, logging.error)
643
  except errors.ConfigurationError, err:
644
    logging.error("Error while trying to set the new master: %s",
645
                  str(err))
646
    return 1
647

    
648
  # if cfg.Update worked, then it means the old master daemon won't be
649
  # able now to write its own config file (we rely on locking in both
650
  # backend.UploadFile() and ConfigWriter._Write(); hence the next
651
  # step is to kill the old master
652

    
653
  logging.info("Stopping the master daemon on node %s", old_master)
654

    
655
  result = rpc.RpcRunner.call_node_stop_master(old_master, True)
656
  msg = result.fail_msg
657
  if msg:
658
    logging.error("Could not disable the master role on the old master"
659
                 " %s, please disable manually: %s", old_master, msg)
660

    
661
  logging.info("Checking master IP non-reachability...")
662

    
663
  master_ip = sstore.GetMasterIP()
664
  total_timeout = 30
665
  # Here we have a phase where no master should be running
666
  def _check_ip():
667
    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
668
      raise utils.RetryAgain()
669

    
670
  try:
671
    utils.Retry(_check_ip, (1, 1.5, 5), total_timeout)
672
  except utils.RetryTimeout:
673
    logging.warning("The master IP is still reachable after %s seconds,"
674
                    " continuing but activating the master on the current"
675
                    " node will probably fail", total_timeout)
676

    
677
  logging.info("Starting the master daemons on the new master")
678

    
679
  result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
680
  msg = result.fail_msg
681
  if msg:
682
    logging.error("Could not start the master role on the new master"
683
                  " %s, please check: %s", new_master, msg)
684
    rcode = 1
685

    
686
  logging.info("Master failed over from %s to %s", old_master, new_master)
687
  return rcode
688

    
689

    
690
def GetMaster():
691
  """Returns the current master node.
692

693
  This is a separate function in bootstrap since it's needed by
694
  gnt-cluster, and instead of importing directly ssconf, it's better
695
  to abstract it in bootstrap, where we do use ssconf in other
696
  functions too.
697

698
  """
699
  sstore = ssconf.SimpleStore()
700

    
701
  old_master, _ = ssconf.GetMasterAndMyself(sstore)
702

    
703
  return old_master
704

    
705

    
706
def GatherMasterVotes(node_list):
707
  """Check the agreement on who is the master.
708

709
  This function will return a list of (node, number of votes), ordered
710
  by the number of votes. Errors will be denoted by the key 'None'.
711

712
  Note that the sum of votes is the number of nodes this machine
713
  knows, whereas the number of entries in the list could be different
714
  (if some nodes vote for another master).
715

716
  We remove ourselves from the list since we know that (bugs aside)
717
  since we use the same source for configuration information for both
718
  backend and boostrap, we'll always vote for ourselves.
719

720
  @type node_list: list
721
  @param node_list: the list of nodes to query for master info; the current
722
      node will be removed if it is in the list
723
  @rtype: list
724
  @return: list of (node, votes)
725

726
  """
727
  myself = netutils.Hostname.GetSysName()
728
  try:
729
    node_list.remove(myself)
730
  except ValueError:
731
    pass
732
  if not node_list:
733
    # no nodes left (eventually after removing myself)
734
    return []
735
  results = rpc.RpcRunner.call_master_info(node_list)
736
  if not isinstance(results, dict):
737
    # this should not happen (unless internal error in rpc)
738
    logging.critical("Can't complete rpc call, aborting master startup")
739
    return [(None, len(node_list))]
740
  votes = {}
741
  for node in results:
742
    nres = results[node]
743
    data = nres.payload
744
    msg = nres.fail_msg
745
    fail = False
746
    if msg:
747
      logging.warning("Error contacting node %s: %s", node, msg)
748
      fail = True
749
    # for now we accept both length 3 and 4 (data[3] is primary ip version)
750
    elif not isinstance(data, (tuple, list)) or len(data) < 3:
751
      logging.warning("Invalid data received from node %s: %s", node, data)
752
      fail = True
753
    if fail:
754
      if None not in votes:
755
        votes[None] = 0
756
      votes[None] += 1
757
      continue
758
    master_node = data[2]
759
    if master_node not in votes:
760
      votes[master_node] = 0
761
    votes[master_node] += 1
762

    
763
  vote_list = [v for v in votes.items()]
764
  # sort first on number of votes then on name, since we want None
765
  # sorted later if we have the half of the nodes not responding, and
766
  # half voting all for the same master
767
  vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
768

    
769
  return vote_list