Statistics
| Branch: | Tag: | Revision:

root / lib / bootstrap.py @ fe71113e

History | View | Annotate | Download (44.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions to bootstrap a new cluster.
23

24
"""
25

    
26
import os
27
import os.path
28
import re
29
import logging
30
import time
31
import tempfile
32

    
33
from ganeti.cmdlib import cluster
34
import ganeti.rpc.node as rpc
35
from ganeti import ssh
36
from ganeti import utils
37
from ganeti import errors
38
from ganeti import config
39
from ganeti import constants
40
from ganeti import objects
41
from ganeti import ssconf
42
from ganeti import serializer
43
from ganeti import hypervisor
44
from ganeti.storage import drbd
45
from ganeti.storage import filestorage
46
from ganeti import netutils
47
from ganeti import luxi
48
from ganeti import jstore
49
from ganeti import pathutils
50
from ganeti import runtime
51

    
52

    
53
# ec_id for InitConfig's temporary reservation manager
54
_INITCONF_ECID = "initconfig-ecid"
55

    
56
#: After how many seconds daemon must be responsive
57
_DAEMON_READY_TIMEOUT = 10.0
58

    
59

    
60
def _InitSSHSetup():
61
  """Setup the SSH configuration for the cluster.
62

63
  This generates a dsa keypair for root, adds the pub key to the
64
  permitted hosts and adds the hostkey to its own known hosts.
65

66
  """
67
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
68

    
69
  for name in priv_key, pub_key:
70
    if os.path.exists(name):
71
      utils.CreateBackup(name)
72
    utils.RemoveFile(name)
73

    
74
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
75
                         "-f", priv_key,
76
                         "-q", "-N", ""])
77
  if result.failed:
78
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
79
                             result.output)
80

    
81
  utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
82

    
83

    
84
def GenerateHmacKey(file_name):
85
  """Writes a new HMAC key.
86

87
  @type file_name: str
88
  @param file_name: Path to output file
89

90
  """
91
  utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
92
                  backup=True)
93

    
94

    
95
# pylint: disable=R0913
96
def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_spice_cert,
97
                          new_confd_hmac_key, new_cds,
98
                          rapi_cert_pem=None, spice_cert_pem=None,
99
                          spice_cacert_pem=None, cds=None,
100
                          nodecert_file=pathutils.NODED_CERT_FILE,
101
                          rapicert_file=pathutils.RAPI_CERT_FILE,
102
                          spicecert_file=pathutils.SPICE_CERT_FILE,
103
                          spicecacert_file=pathutils.SPICE_CACERT_FILE,
104
                          hmackey_file=pathutils.CONFD_HMAC_KEY,
105
                          cds_file=pathutils.CLUSTER_DOMAIN_SECRET_FILE):
106
  """Updates the cluster certificates, keys and secrets.
107

108
  @type new_cluster_cert: bool
109
  @param new_cluster_cert: Whether to generate a new cluster certificate
110
  @type new_rapi_cert: bool
111
  @param new_rapi_cert: Whether to generate a new RAPI certificate
112
  @type new_spice_cert: bool
113
  @param new_spice_cert: Whether to generate a new SPICE certificate
114
  @type new_confd_hmac_key: bool
115
  @param new_confd_hmac_key: Whether to generate a new HMAC key
116
  @type new_cds: bool
117
  @param new_cds: Whether to generate a new cluster domain secret
118
  @type rapi_cert_pem: string
119
  @param rapi_cert_pem: New RAPI certificate in PEM format
120
  @type spice_cert_pem: string
121
  @param spice_cert_pem: New SPICE certificate in PEM format
122
  @type spice_cacert_pem: string
123
  @param spice_cacert_pem: Certificate of the CA that signed the SPICE
124
                           certificate, in PEM format
125
  @type cds: string
126
  @param cds: New cluster domain secret
127
  @type nodecert_file: string
128
  @param nodecert_file: optional override of the node cert file path
129
  @type rapicert_file: string
130
  @param rapicert_file: optional override of the rapi cert file path
131
  @type spicecert_file: string
132
  @param spicecert_file: optional override of the spice cert file path
133
  @type spicecacert_file: string
134
  @param spicecacert_file: optional override of the spice CA cert file path
135
  @type hmackey_file: string
136
  @param hmackey_file: optional override of the hmac key file path
137

138
  """
139
  # pylint: disable=R0913
140
  # noded SSL certificate
141
  utils.GenerateNewSslCert(
142
    new_cluster_cert, nodecert_file, 1,
143
    "Generating new cluster certificate at %s" % nodecert_file)
144

    
145
  # confd HMAC key
146
  if new_confd_hmac_key or not os.path.exists(hmackey_file):
147
    logging.debug("Writing new confd HMAC key to %s", hmackey_file)
148
    GenerateHmacKey(hmackey_file)
149

    
150
  if rapi_cert_pem:
151
    # Assume rapi_pem contains a valid PEM-formatted certificate and key
152
    logging.debug("Writing RAPI certificate at %s", rapicert_file)
153
    utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True)
154

    
155
  else:
156
    utils.GenerateNewSslCert(
157
      new_rapi_cert, rapicert_file, 1,
158
      "Generating new RAPI certificate at %s" % rapicert_file)
159

    
160
  # SPICE
161
  spice_cert_exists = os.path.exists(spicecert_file)
162
  spice_cacert_exists = os.path.exists(spicecacert_file)
163
  if spice_cert_pem:
164
    # spice_cert_pem implies also spice_cacert_pem
165
    logging.debug("Writing SPICE certificate at %s", spicecert_file)
166
    utils.WriteFile(spicecert_file, data=spice_cert_pem, backup=True)
167
    logging.debug("Writing SPICE CA certificate at %s", spicecacert_file)
168
    utils.WriteFile(spicecacert_file, data=spice_cacert_pem, backup=True)
169
  elif new_spice_cert or not spice_cert_exists:
170
    if spice_cert_exists:
171
      utils.CreateBackup(spicecert_file)
172
    if spice_cacert_exists:
173
      utils.CreateBackup(spicecacert_file)
174

    
175
    logging.debug("Generating new self-signed SPICE certificate at %s",
176
                  spicecert_file)
177
    (_, cert_pem) = utils.GenerateSelfSignedSslCert(spicecert_file, 1)
178

    
179
    # Self-signed certificate -> the public certificate is also the CA public
180
    # certificate
181
    logging.debug("Writing the public certificate to %s",
182
                  spicecert_file)
183
    utils.io.WriteFile(spicecacert_file, mode=0400, data=cert_pem)
184

    
185
  # Cluster domain secret
186
  if cds:
187
    logging.debug("Writing cluster domain secret to %s", cds_file)
188
    utils.WriteFile(cds_file, data=cds, backup=True)
189

    
190
  elif new_cds or not os.path.exists(cds_file):
191
    logging.debug("Generating new cluster domain secret at %s", cds_file)
192
    GenerateHmacKey(cds_file)
193

    
194

    
195
def _InitGanetiServerSetup(master_name):
196
  """Setup the necessary configuration for the initial node daemon.
197

198
  This creates the nodepass file containing the shared password for
199
  the cluster, generates the SSL certificate and starts the node daemon.
200

201
  @type master_name: str
202
  @param master_name: Name of the master node
203

204
  """
205
  # Generate cluster secrets
206
  GenerateClusterCrypto(True, False, False, False, False)
207

    
208
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start", constants.NODED])
209
  if result.failed:
210
    raise errors.OpExecError("Could not start the node daemon, command %s"
211
                             " had exitcode %s and error %s" %
212
                             (result.cmd, result.exit_code, result.output))
213

    
214
  _WaitForNodeDaemon(master_name)
215

    
216

    
217
def _WaitForNodeDaemon(node_name):
218
  """Wait for node daemon to become responsive.
219

220
  """
221
  def _CheckNodeDaemon():
222
    # Pylint bug <http://www.logilab.org/ticket/35642>
223
    # pylint: disable=E1101
224
    result = rpc.BootstrapRunner().call_version([node_name])[node_name]
225
    if result.fail_msg:
226
      raise utils.RetryAgain()
227

    
228
  try:
229
    utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
230
  except utils.RetryTimeout:
231
    raise errors.OpExecError("Node daemon on %s didn't answer queries within"
232
                             " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
233

    
234

    
235
def _WaitForMasterDaemon():
236
  """Wait for master daemon to become responsive.
237

238
  """
239
  def _CheckMasterDaemon():
240
    try:
241
      cl = luxi.Client()
242
      (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
243
    except Exception:
244
      raise utils.RetryAgain()
245

    
246
    logging.debug("Received cluster name %s from master", cluster_name)
247

    
248
  try:
249
    utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
250
  except utils.RetryTimeout:
251
    raise errors.OpExecError("Master daemon didn't answer queries within"
252
                             " %s seconds" % _DAEMON_READY_TIMEOUT)
253

    
254

    
255
def _WaitForSshDaemon(hostname, port, family):
256
  """Wait for SSH daemon to become responsive.
257

258
  """
259
  hostip = netutils.GetHostname(name=hostname, family=family).ip
260

    
261
  def _CheckSshDaemon():
262
    if netutils.TcpPing(hostip, port, timeout=1.0, live_port_needed=True):
263
      logging.debug("SSH daemon on %s:%s (IP address %s) has become"
264
                    " responsive", hostname, port, hostip)
265
    else:
266
      raise utils.RetryAgain()
267

    
268
  try:
269
    utils.Retry(_CheckSshDaemon, 1.0, _DAEMON_READY_TIMEOUT)
270
  except utils.RetryTimeout:
271
    raise errors.OpExecError("SSH daemon on %s:%s (IP address %s) didn't"
272
                             " become responsive within %s seconds" %
273
                             (hostname, port, hostip, _DAEMON_READY_TIMEOUT))
274

    
275

    
276
def RunNodeSetupCmd(cluster_name, node, basecmd, debug, verbose,
277
                    use_cluster_key, ask_key, strict_host_check,
278
                    port, data):
279
  """Runs a command to configure something on a remote machine.
280

281
  @type cluster_name: string
282
  @param cluster_name: Cluster name
283
  @type node: string
284
  @param node: Node name
285
  @type basecmd: string
286
  @param basecmd: Base command (path on the remote machine)
287
  @type debug: bool
288
  @param debug: Enable debug output
289
  @type verbose: bool
290
  @param verbose: Enable verbose output
291
  @type use_cluster_key: bool
292
  @param use_cluster_key: See L{ssh.SshRunner.BuildCmd}
293
  @type ask_key: bool
294
  @param ask_key: See L{ssh.SshRunner.BuildCmd}
295
  @type strict_host_check: bool
296
  @param strict_host_check: See L{ssh.SshRunner.BuildCmd}
297
  @type port: int
298
  @param port: The SSH port of the remote machine or None for the default
299
  @param data: JSON-serializable input data for script (passed to stdin)
300

301
  """
302
  cmd = [basecmd]
303

    
304
  # Pass --debug/--verbose to the external script if set on our invocation
305
  if debug:
306
    cmd.append("--debug")
307

    
308
  if verbose:
309
    cmd.append("--verbose")
310

    
311
  logging.debug("Node setup command: %s", cmd)
312

    
313
  version = constants.DIR_VERSION
314
  all_cmds = [["test", "-d", os.path.join(pathutils.PKGLIBDIR, version)]]
315
  if constants.HAS_GNU_LN:
316
    all_cmds.extend([["ln", "-s", "-f", "-T",
317
                      os.path.join(pathutils.PKGLIBDIR, version),
318
                      os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
319
                     ["ln", "-s", "-f", "-T",
320
                      os.path.join(pathutils.SHAREDIR, version),
321
                      os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]])
322
  else:
323
    all_cmds.extend([["rm", "-f",
324
                      os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
325
                     ["ln", "-s", "-f",
326
                      os.path.join(pathutils.PKGLIBDIR, version),
327
                      os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
328
                     ["rm", "-f",
329
                      os.path.join(pathutils.SYSCONFDIR, "ganeti/share")],
330
                     ["ln", "-s", "-f",
331
                      os.path.join(pathutils.SHAREDIR, version),
332
                      os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]])
333
  all_cmds.append(cmd)
334

    
335
  if port is None:
336
    port = netutils.GetDaemonPort(constants.SSH)
337

    
338
  family = ssconf.SimpleStore().GetPrimaryIPFamily()
339
  srun = ssh.SshRunner(cluster_name,
340
                       ipv6=(family == netutils.IP6Address.family))
341
  scmd = srun.BuildCmd(node, constants.SSH_LOGIN_USER,
342
                       utils.ShellQuoteArgs(
343
                           utils.ShellCombineCommands(all_cmds)),
344
                       batch=False, ask_key=ask_key, quiet=False,
345
                       strict_host_check=strict_host_check,
346
                       use_cluster_key=use_cluster_key,
347
                       port=port)
348

    
349
  tempfh = tempfile.TemporaryFile()
350
  try:
351
    tempfh.write(serializer.DumpJson(data))
352
    tempfh.seek(0)
353

    
354
    result = utils.RunCmd(scmd, interactive=True, input_fd=tempfh)
355
  finally:
356
    tempfh.close()
357

    
358
  if result.failed:
359
    raise errors.OpExecError("Command '%s' failed: %s" %
360
                             (result.cmd, result.fail_reason))
361

    
362
  _WaitForSshDaemon(node, port, family)
363

    
364

    
365
def _InitFileStorageDir(file_storage_dir):
366
  """Initialize if needed the file storage.
367

368
  @param file_storage_dir: the user-supplied value
369
  @return: either empty string (if file storage was disabled at build
370
      time) or the normalized path to the storage directory
371

372
  """
373
  file_storage_dir = os.path.normpath(file_storage_dir)
374

    
375
  if not os.path.isabs(file_storage_dir):
376
    raise errors.OpPrereqError("File storage directory '%s' is not an absolute"
377
                               " path" % file_storage_dir, errors.ECODE_INVAL)
378

    
379
  if not os.path.exists(file_storage_dir):
380
    try:
381
      os.makedirs(file_storage_dir, 0750)
382
    except OSError, err:
383
      raise errors.OpPrereqError("Cannot create file storage directory"
384
                                 " '%s': %s" % (file_storage_dir, err),
385
                                 errors.ECODE_ENVIRON)
386

    
387
  if not os.path.isdir(file_storage_dir):
388
    raise errors.OpPrereqError("The file storage directory '%s' is not"
389
                               " a directory." % file_storage_dir,
390
                               errors.ECODE_ENVIRON)
391

    
392
  return file_storage_dir
393

    
394

    
395
def _PrepareFileBasedStorage(
396
    enabled_disk_templates, file_storage_dir,
397
    default_dir, file_disk_template,
398
    init_fn=_InitFileStorageDir, acceptance_fn=None):
399
  """Checks if a file-base storage type is enabled and inits the dir.
400

401
  @type enabled_disk_templates: list of string
402
  @param enabled_disk_templates: list of enabled disk templates
403
  @type file_storage_dir: string
404
  @param file_storage_dir: the file storage directory
405
  @type default_dir: string
406
  @param default_dir: default file storage directory when C{file_storage_dir}
407
      is 'None'
408
  @type file_disk_template: string
409
  @param file_disk_template: a disk template whose storage type is 'ST_FILE' or
410
      'ST_SHARED_FILE'
411
  @rtype: string
412
  @returns: the name of the actual file storage directory
413

414
  """
415
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
416
            constants.ST_FILE, constants.ST_SHARED_FILE
417
         ))
418

    
419
  if file_storage_dir is None:
420
    file_storage_dir = default_dir
421
  if not acceptance_fn:
422
    acceptance_fn = \
423
        lambda path: filestorage.CheckFileStoragePathAcceptance(
424
            path, exact_match_ok=True)
425

    
426
  cluster.CheckFileStoragePathVsEnabledDiskTemplates(
427
      logging.warning, file_storage_dir, enabled_disk_templates)
428

    
429
  file_storage_enabled = file_disk_template in enabled_disk_templates
430
  if file_storage_enabled:
431
    try:
432
      acceptance_fn(file_storage_dir)
433
    except errors.FileStoragePathError as e:
434
      raise errors.OpPrereqError(str(e))
435
    result_file_storage_dir = init_fn(file_storage_dir)
436
  else:
437
    result_file_storage_dir = file_storage_dir
438
  return result_file_storage_dir
439

    
440

    
441
def _PrepareFileStorage(
442
    enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir,
443
    acceptance_fn=None):
444
  """Checks if file storage is enabled and inits the dir.
445

446
  @see: C{_PrepareFileBasedStorage}
447

448
  """
449
  return _PrepareFileBasedStorage(
450
      enabled_disk_templates, file_storage_dir,
451
      pathutils.DEFAULT_FILE_STORAGE_DIR, constants.DT_FILE,
452
      init_fn=init_fn, acceptance_fn=acceptance_fn)
453

    
454

    
455
def _PrepareSharedFileStorage(
456
    enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir,
457
    acceptance_fn=None):
458
  """Checks if shared file storage is enabled and inits the dir.
459

460
  @see: C{_PrepareFileBasedStorage}
461

462
  """
463
  return _PrepareFileBasedStorage(
464
      enabled_disk_templates, file_storage_dir,
465
      pathutils.DEFAULT_SHARED_FILE_STORAGE_DIR, constants.DT_SHARED_FILE,
466
      init_fn=init_fn, acceptance_fn=acceptance_fn)
467

    
468

    
469
def _PrepareGlusterStorage(
470
    enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir,
471
    acceptance_fn=None):
472
  """Checks if gluster storage is enabled and inits the dir.
473

474
  @see: C{_PrepareFileBasedStorage}
475

476
  """
477
  return _PrepareFileBasedStorage(
478
      enabled_disk_templates, file_storage_dir,
479
      pathutils.DEFAULT_GLUSTER_STORAGE_DIR, constants.DT_GLUSTER,
480
      init_fn=init_fn, acceptance_fn=acceptance_fn)
481

    
482

    
483
def _InitCheckEnabledDiskTemplates(enabled_disk_templates):
484
  """Checks the sanity of the enabled disk templates.
485

486
  """
487
  if not enabled_disk_templates:
488
    raise errors.OpPrereqError("Enabled disk templates list must contain at"
489
                               " least one member", errors.ECODE_INVAL)
490
  invalid_disk_templates = \
491
    set(enabled_disk_templates) - constants.DISK_TEMPLATES
492
  if invalid_disk_templates:
493
    raise errors.OpPrereqError("Enabled disk templates list contains invalid"
494
                               " entries: %s" % invalid_disk_templates,
495
                               errors.ECODE_INVAL)
496

    
497

    
498
def _RestrictIpolicyToEnabledDiskTemplates(ipolicy, enabled_disk_templates):
499
  """Restricts the ipolicy's disk templates to the enabled ones.
500

501
  This function clears the ipolicy's list of allowed disk templates from the
502
  ones that are not enabled by the cluster.
503

504
  @type ipolicy: dict
505
  @param ipolicy: the instance policy
506
  @type enabled_disk_templates: list of string
507
  @param enabled_disk_templates: the list of cluster-wide enabled disk
508
    templates
509

510
  """
511
  assert constants.IPOLICY_DTS in ipolicy
512
  allowed_disk_templates = ipolicy[constants.IPOLICY_DTS]
513
  restricted_disk_templates = list(set(allowed_disk_templates)
514
                                   .intersection(set(enabled_disk_templates)))
515
  ipolicy[constants.IPOLICY_DTS] = restricted_disk_templates
516

    
517

    
518
def _InitCheckDrbdHelper(drbd_helper, drbd_enabled):
519
  """Checks the DRBD usermode helper.
520

521
  @type drbd_helper: string
522
  @param drbd_helper: name of the DRBD usermode helper that the system should
523
    use
524

525
  """
526
  if not drbd_enabled:
527
    return
528

    
529
  if drbd_helper is not None:
530
    try:
531
      curr_helper = drbd.DRBD8.GetUsermodeHelper()
532
    except errors.BlockDeviceError, err:
533
      raise errors.OpPrereqError("Error while checking drbd helper"
534
                                 " (disable drbd with --enabled-disk-templates"
535
                                 " if you are not using drbd): %s" % str(err),
536
                                 errors.ECODE_ENVIRON)
537
    if drbd_helper != curr_helper:
538
      raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s"
539
                                 " is the current helper" % (drbd_helper,
540
                                                             curr_helper),
541
                                 errors.ECODE_INVAL)
542

    
543

    
544
def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914
545
                master_netmask, master_netdev, file_storage_dir,
546
                shared_file_storage_dir, gluster_storage_dir,
547
                candidate_pool_size, secondary_ip=None,
548
                vg_name=None, beparams=None, nicparams=None, ndparams=None,
549
                hvparams=None, diskparams=None, enabled_hypervisors=None,
550
                modify_etc_hosts=True, modify_ssh_setup=True,
551
                maintain_node_health=False, drbd_helper=None, uid_pool=None,
552
                default_iallocator=None, default_iallocator_params=None,
553
                primary_ip_version=None, ipolicy=None,
554
                prealloc_wipe_disks=False, use_external_mip_script=False,
555
                hv_state=None, disk_state=None, enabled_disk_templates=None,
556
                zeroing_image=None, compression_tools=None):
557
  """Initialise the cluster.
558

559
  @type candidate_pool_size: int
560
  @param candidate_pool_size: master candidate pool size
561
  @type enabled_disk_templates: list of string
562
  @param enabled_disk_templates: list of disk_templates to be used in this
563
    cluster
564

565
  """
566
  # TODO: complete the docstring
567
  if config.ConfigWriter.IsCluster():
568
    raise errors.OpPrereqError("Cluster is already initialised",
569
                               errors.ECODE_STATE)
570

    
571
  if not enabled_hypervisors:
572
    raise errors.OpPrereqError("Enabled hypervisors list must contain at"
573
                               " least one member", errors.ECODE_INVAL)
574
  invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
575
  if invalid_hvs:
576
    raise errors.OpPrereqError("Enabled hypervisors contains invalid"
577
                               " entries: %s" % invalid_hvs,
578
                               errors.ECODE_INVAL)
579

    
580
  _InitCheckEnabledDiskTemplates(enabled_disk_templates)
581

    
582
  try:
583
    ipcls = netutils.IPAddress.GetClassFromIpVersion(primary_ip_version)
584
  except errors.ProgrammerError:
585
    raise errors.OpPrereqError("Invalid primary ip version: %d." %
586
                               primary_ip_version, errors.ECODE_INVAL)
587

    
588
  hostname = netutils.GetHostname(family=ipcls.family)
589
  if not ipcls.IsValid(hostname.ip):
590
    raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
591
                               " address." % (hostname.ip, primary_ip_version),
592
                               errors.ECODE_INVAL)
593

    
594
  if ipcls.IsLoopback(hostname.ip):
595
    raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
596
                               " address. Please fix DNS or %s." %
597
                               (hostname.ip, pathutils.ETC_HOSTS),
598
                               errors.ECODE_ENVIRON)
599

    
600
  if not ipcls.Own(hostname.ip):
601
    raise errors.OpPrereqError("Inconsistency: this host's name resolves"
602
                               " to %s,\nbut this ip address does not"
603
                               " belong to this host" %
604
                               hostname.ip, errors.ECODE_ENVIRON)
605

    
606
  clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
607

    
608
  if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
609
    raise errors.OpPrereqError("Cluster IP already active",
610
                               errors.ECODE_NOTUNIQUE)
611

    
612
  if not secondary_ip:
613
    if primary_ip_version == constants.IP6_VERSION:
614
      raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
615
                                 " IPv4 address must be given as secondary",
616
                                 errors.ECODE_INVAL)
617
    secondary_ip = hostname.ip
618

    
619
  if not netutils.IP4Address.IsValid(secondary_ip):
620
    raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
621
                               " IPv4 address." % secondary_ip,
622
                               errors.ECODE_INVAL)
623

    
624
  if not netutils.IP4Address.Own(secondary_ip):
625
    raise errors.OpPrereqError("You gave %s as secondary IP,"
626
                               " but it does not belong to this host." %
627
                               secondary_ip, errors.ECODE_ENVIRON)
628

    
629
  if master_netmask is not None:
630
    if not ipcls.ValidateNetmask(master_netmask):
631
      raise errors.OpPrereqError("CIDR netmask (%s) not valid for IPv%s " %
632
                                  (master_netmask, primary_ip_version),
633
                                 errors.ECODE_INVAL)
634
  else:
635
    master_netmask = ipcls.iplen
636

    
637
  if vg_name:
638
    # Check if volume group is valid
639
    vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
640
                                          constants.MIN_VG_SIZE)
641
    if vgstatus:
642
      raise errors.OpPrereqError("Error: %s" % vgstatus, errors.ECODE_INVAL)
643

    
644
  drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
645
  _InitCheckDrbdHelper(drbd_helper, drbd_enabled)
646

    
647
  logging.debug("Stopping daemons (if any are running)")
648
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-all"])
649
  if result.failed:
650
    raise errors.OpExecError("Could not stop daemons, command %s"
651
                             " had exitcode %s and error '%s'" %
652
                             (result.cmd, result.exit_code, result.output))
653

    
654
  file_storage_dir = _PrepareFileStorage(enabled_disk_templates,
655
                                         file_storage_dir)
656
  shared_file_storage_dir = _PrepareSharedFileStorage(enabled_disk_templates,
657
                                                      shared_file_storage_dir)
658

    
659
  if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
660
    raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
661
                               errors.ECODE_INVAL)
662

    
663
  if not nicparams.get('mode', None) == constants.NIC_MODE_OVS:
664
    # Do not do this check if mode=openvswitch, since the openvswitch is not
665
    # created yet
666
    result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
667
    if result.failed:
668
      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
669
                                 (master_netdev,
670
                                  result.output.strip()), errors.ECODE_INVAL)
671

    
672
  dirs = [(pathutils.RUN_DIR, constants.RUN_DIRS_MODE)]
673
  utils.EnsureDirs(dirs)
674

    
675
  objects.UpgradeBeParams(beparams)
676
  utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
677
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
678

    
679
  objects.NIC.CheckParameterSyntax(nicparams)
680

    
681
  full_ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy)
682
  _RestrictIpolicyToEnabledDiskTemplates(full_ipolicy, enabled_disk_templates)
683

    
684
  if ndparams is not None:
685
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
686
  else:
687
    ndparams = dict(constants.NDC_DEFAULTS)
688

    
689
  # This is ugly, as we modify the dict itself
690
  # FIXME: Make utils.ForceDictType pure functional or write a wrapper
691
  # around it
692
  if hv_state:
693
    for hvname, hvs_data in hv_state.items():
694
      utils.ForceDictType(hvs_data, constants.HVSTS_PARAMETER_TYPES)
695
      hv_state[hvname] = objects.Cluster.SimpleFillHvState(hvs_data)
696
  else:
697
    hv_state = dict((hvname, constants.HVST_DEFAULTS)
698
                    for hvname in enabled_hypervisors)
699

    
700
  # FIXME: disk_state has no default values yet
701
  if disk_state:
702
    for storage, ds_data in disk_state.items():
703
      if storage not in constants.DS_VALID_TYPES:
704
        raise errors.OpPrereqError("Invalid storage type in disk state: %s" %
705
                                   storage, errors.ECODE_INVAL)
706
      for ds_name, state in ds_data.items():
707
        utils.ForceDictType(state, constants.DSS_PARAMETER_TYPES)
708
        ds_data[ds_name] = objects.Cluster.SimpleFillDiskState(state)
709

    
710
  # hvparams is a mapping of hypervisor->hvparams dict
711
  for hv_name, hv_params in hvparams.iteritems():
712
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
713
    hv_class = hypervisor.GetHypervisor(hv_name)
714
    hv_class.CheckParameterSyntax(hv_params)
715

    
716
  # diskparams is a mapping of disk-template->diskparams dict
717
  for template, dt_params in diskparams.items():
718
    param_keys = set(dt_params.keys())
719
    default_param_keys = set(constants.DISK_DT_DEFAULTS[template].keys())
720
    if not (param_keys <= default_param_keys):
721
      unknown_params = param_keys - default_param_keys
722
      raise errors.OpPrereqError("Invalid parameters for disk template %s:"
723
                                 " %s" % (template,
724
                                          utils.CommaJoin(unknown_params)),
725
                                 errors.ECODE_INVAL)
726
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
727
    if template == constants.DT_DRBD8 and vg_name is not None:
728
      # The default METAVG value is equal to the VG name set at init time,
729
      # if provided
730
      dt_params[constants.DRBD_DEFAULT_METAVG] = vg_name
731

    
732
  try:
733
    utils.VerifyDictOptions(diskparams, constants.DISK_DT_DEFAULTS)
734
  except errors.OpPrereqError, err:
735
    raise errors.OpPrereqError("While verify diskparam options: %s" % err,
736
                               errors.ECODE_INVAL)
737

    
738
  # set up ssh config and /etc/hosts
739
  rsa_sshkey = ""
740
  dsa_sshkey = ""
741
  if os.path.isfile(pathutils.SSH_HOST_RSA_PUB):
742
    sshline = utils.ReadFile(pathutils.SSH_HOST_RSA_PUB)
743
    rsa_sshkey = sshline.split(" ")[1]
744
  if os.path.isfile(pathutils.SSH_HOST_DSA_PUB):
745
    sshline = utils.ReadFile(pathutils.SSH_HOST_DSA_PUB)
746
    dsa_sshkey = sshline.split(" ")[1]
747
  if not rsa_sshkey and not dsa_sshkey:
748
    raise errors.OpPrereqError("Failed to find SSH public keys",
749
                               errors.ECODE_ENVIRON)
750

    
751
  if modify_etc_hosts:
752
    utils.AddHostToEtcHosts(hostname.name, hostname.ip)
753

    
754
  if modify_ssh_setup:
755
    _InitSSHSetup()
756

    
757
  if default_iallocator is not None:
758
    alloc_script = utils.FindFile(default_iallocator,
759
                                  constants.IALLOCATOR_SEARCH_PATH,
760
                                  os.path.isfile)
761
    if alloc_script is None:
762
      raise errors.OpPrereqError("Invalid default iallocator script '%s'"
763
                                 " specified" % default_iallocator,
764
                                 errors.ECODE_INVAL)
765
  else:
766
    # default to htools
767
    if utils.FindFile(constants.IALLOC_HAIL,
768
                      constants.IALLOCATOR_SEARCH_PATH,
769
                      os.path.isfile):
770
      default_iallocator = constants.IALLOC_HAIL
771

    
772
  # check if we have all the users we need
773
  try:
774
    runtime.GetEnts()
775
  except errors.ConfigurationError, err:
776
    raise errors.OpPrereqError("Required system user/group missing: %s" %
777
                               err, errors.ECODE_ENVIRON)
778

    
779
  candidate_certs = {}
780

    
781
  now = time.time()
782

    
783
  if compression_tools is not None:
784
    cluster.CheckCompressionTools(compression_tools)
785

    
786
  # init of cluster config file
787
  cluster_config = objects.Cluster(
788
    serial_no=1,
789
    rsahostkeypub=rsa_sshkey,
790
    dsahostkeypub=dsa_sshkey,
791
    highest_used_port=(constants.FIRST_DRBD_PORT - 1),
792
    mac_prefix=mac_prefix,
793
    volume_group_name=vg_name,
794
    tcpudp_port_pool=set(),
795
    master_ip=clustername.ip,
796
    master_netmask=master_netmask,
797
    master_netdev=master_netdev,
798
    cluster_name=clustername.name,
799
    file_storage_dir=file_storage_dir,
800
    shared_file_storage_dir=shared_file_storage_dir,
801
    gluster_storage_dir=gluster_storage_dir,
802
    enabled_hypervisors=enabled_hypervisors,
803
    beparams={constants.PP_DEFAULT: beparams},
804
    nicparams={constants.PP_DEFAULT: nicparams},
805
    ndparams=ndparams,
806
    hvparams=hvparams,
807
    diskparams=diskparams,
808
    candidate_pool_size=candidate_pool_size,
809
    modify_etc_hosts=modify_etc_hosts,
810
    modify_ssh_setup=modify_ssh_setup,
811
    uid_pool=uid_pool,
812
    ctime=now,
813
    mtime=now,
814
    maintain_node_health=maintain_node_health,
815
    drbd_usermode_helper=drbd_helper,
816
    default_iallocator=default_iallocator,
817
    default_iallocator_params=default_iallocator_params,
818
    primary_ip_family=ipcls.family,
819
    prealloc_wipe_disks=prealloc_wipe_disks,
820
    use_external_mip_script=use_external_mip_script,
821
    ipolicy=full_ipolicy,
822
    hv_state_static=hv_state,
823
    disk_state_static=disk_state,
824
    enabled_disk_templates=enabled_disk_templates,
825
    candidate_certs=candidate_certs,
826
    osparams={},
827
    osparams_private_cluster={},
828
    zeroing_image=zeroing_image,
829
    compression_tools=compression_tools
830
    )
831
  master_node_config = objects.Node(name=hostname.name,
832
                                    primary_ip=hostname.ip,
833
                                    secondary_ip=secondary_ip,
834
                                    serial_no=1,
835
                                    master_candidate=True,
836
                                    offline=False, drained=False,
837
                                    ctime=now, mtime=now,
838
                                    )
839
  InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
840
  cfg = config.ConfigWriter(offline=True)
841
  ssh.WriteKnownHostsFile(cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
842
  cfg.Update(cfg.GetClusterInfo(), logging.error)
843
  ssconf.WriteSsconfFiles(cfg.GetSsconfValues())
844

    
845
  # set up the inter-node password and certificate
846
  _InitGanetiServerSetup(hostname.name)
847

    
848
  logging.debug("Starting daemons")
849
  result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
850
  if result.failed:
851
    raise errors.OpExecError("Could not start daemons, command %s"
852
                             " had exitcode %s and error %s" %
853
                             (result.cmd, result.exit_code, result.output))
854

    
855
  _WaitForMasterDaemon()
856

    
857

    
858
def InitConfig(version, cluster_config, master_node_config,
859
               cfg_file=pathutils.CLUSTER_CONF_FILE):
860
  """Create the initial cluster configuration.
861

862
  It will contain the current node, which will also be the master
863
  node, and no instances.
864

865
  @type version: int
866
  @param version: configuration version
867
  @type cluster_config: L{objects.Cluster}
868
  @param cluster_config: cluster configuration
869
  @type master_node_config: L{objects.Node}
870
  @param master_node_config: master node configuration
871
  @type cfg_file: string
872
  @param cfg_file: configuration file path
873

874
  """
875
  uuid_generator = config.TemporaryReservationManager()
876
  cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
877
                                                _INITCONF_ECID)
878
  master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
879
                                                    _INITCONF_ECID)
880
  cluster_config.master_node = master_node_config.uuid
881
  nodes = {
882
    master_node_config.uuid: master_node_config,
883
    }
884
  default_nodegroup = objects.NodeGroup(
885
    uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
886
    name=constants.INITIAL_NODE_GROUP_NAME,
887
    members=[master_node_config.uuid],
888
    diskparams={},
889
    )
890
  nodegroups = {
891
    default_nodegroup.uuid: default_nodegroup,
892
    }
893
  now = time.time()
894
  config_data = objects.ConfigData(version=version,
895
                                   cluster=cluster_config,
896
                                   nodegroups=nodegroups,
897
                                   nodes=nodes,
898
                                   instances={},
899
                                   networks={},
900
                                   disks={},
901
                                   serial_no=1,
902
                                   ctime=now, mtime=now)
903
  utils.WriteFile(cfg_file,
904
                  data=serializer.Dump(config_data.ToDict()),
905
                  mode=0600)
906

    
907

    
908
def FinalizeClusterDestroy(master_uuid):
909
  """Execute the last steps of cluster destroy
910

911
  This function shuts down all the daemons, completing the destroy
912
  begun in cmdlib.LUDestroyOpcode.
913

914
  """
915
  livelock = utils.livelock.LiveLock("bootstrap_destroy")
916
  cfg = config.GetConfig(None, livelock)
917
  modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
918
  runner = rpc.BootstrapRunner()
919

    
920
  master_name = cfg.GetNodeName(master_uuid)
921

    
922
  master_params = cfg.GetMasterNetworkParameters()
923
  master_params.uuid = master_uuid
924
  ems = cfg.GetUseExternalMipScript()
925
  result = runner.call_node_deactivate_master_ip(master_name, master_params,
926
                                                 ems)
927

    
928
  msg = result.fail_msg
929
  if msg:
930
    logging.warning("Could not disable the master IP: %s", msg)
931

    
932
  result = runner.call_node_stop_master(master_name)
933
  msg = result.fail_msg
934
  if msg:
935
    logging.warning("Could not disable the master role: %s", msg)
936

    
937
  result = runner.call_node_leave_cluster(master_name, modify_ssh_setup)
938
  msg = result.fail_msg
939
  if msg:
940
    logging.warning("Could not shutdown the node daemon and cleanup"
941
                    " the node: %s", msg)
942

    
943

    
944
def SetupNodeDaemon(opts, cluster_name, node, ssh_port):
945
  """Add a node to the cluster.
946

947
  This function must be called before the actual opcode, and will ssh
948
  to the remote node, copy the needed files, and start ganeti-noded,
949
  allowing the master to do the rest via normal rpc calls.
950

951
  @param cluster_name: the cluster name
952
  @param node: the name of the new node
953
  @param ssh_port: the SSH port of the new node
954

955
  """
956
  data = {
957
    constants.NDS_CLUSTER_NAME: cluster_name,
958
    constants.NDS_NODE_DAEMON_CERTIFICATE:
959
      utils.ReadFile(pathutils.NODED_CERT_FILE),
960
    constants.NDS_SSCONF: ssconf.SimpleStore().ReadAll(),
961
    constants.NDS_START_NODE_DAEMON: True,
962
    }
963

    
964
  RunNodeSetupCmd(cluster_name, node, pathutils.NODE_DAEMON_SETUP,
965
                  opts.debug, opts.verbose,
966
                  True, opts.ssh_key_check, opts.ssh_key_check,
967
                  ssh_port, data)
968

    
969
  _WaitForNodeDaemon(node)
970

    
971

    
972
def MasterFailover(no_voting=False):
973
  """Failover the master node.
974

975
  This checks that we are not already the master, and will cause the
976
  current master to cease being master, and the non-master to become
977
  new master.
978

979
  @type no_voting: boolean
980
  @param no_voting: force the operation without remote nodes agreement
981
                      (dangerous)
982

983
  """
984
  sstore = ssconf.SimpleStore()
985

    
986
  old_master, new_master = ssconf.GetMasterAndMyself(sstore)
987
  node_names = sstore.GetNodeList()
988
  mc_list = sstore.GetMasterCandidates()
989

    
990
  if old_master == new_master:
991
    raise errors.OpPrereqError("This commands must be run on the node"
992
                               " where you want the new master to be."
993
                               " %s is already the master" %
994
                               old_master, errors.ECODE_INVAL)
995

    
996
  if new_master not in mc_list:
997
    mc_no_master = [name for name in mc_list if name != old_master]
998
    raise errors.OpPrereqError("This node is not among the nodes marked"
999
                               " as master candidates. Only these nodes"
1000
                               " can become masters. Current list of"
1001
                               " master candidates is:\n"
1002
                               "%s" % ("\n".join(mc_no_master)),
1003
                               errors.ECODE_STATE)
1004

    
1005
  if not no_voting:
1006
    vote_list = GatherMasterVotes(node_names)
1007

    
1008
    if vote_list:
1009
      voted_master = vote_list[0][0]
1010
      if voted_master is None:
1011
        raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
1012
                                   " not respond.", errors.ECODE_ENVIRON)
1013
      elif voted_master != old_master:
1014
        raise errors.OpPrereqError("I have a wrong configuration, I believe"
1015
                                   " the master is %s but the other nodes"
1016
                                   " voted %s. Please resync the configuration"
1017
                                   " of this node." %
1018
                                   (old_master, voted_master),
1019
                                   errors.ECODE_STATE)
1020
  # end checks
1021

    
1022
  rcode = 0
1023

    
1024
  logging.info("Setting master to %s, old master: %s", new_master, old_master)
1025

    
1026
  try:
1027
    # Start WConfd so that we can access the configuration
1028
    result = utils.RunCmd([pathutils.DAEMON_UTIL,
1029
                           "start", constants.WCONFD, "--force-node"])
1030
    if result.failed:
1031
      raise errors.OpPrereqError("Could not start the configuration daemon,"
1032
                                 " command %s had exitcode %s and error %s" %
1033
                                 (result.cmd, result.exit_code, result.output),
1034
                                 errors.ECODE_NOENT)
1035

    
1036
    # instantiate a real config writer, as we now know we have the
1037
    # configuration data
1038
    livelock = utils.livelock.LiveLock("bootstrap_failover")
1039
    cfg = config.GetConfig(None, livelock, accept_foreign=True)
1040

    
1041
    old_master_node = cfg.GetNodeInfoByName(old_master)
1042
    if old_master_node is None:
1043
      raise errors.OpPrereqError("Could not find old master node '%s' in"
1044
                                 " cluster configuration." % old_master,
1045
                                 errors.ECODE_NOENT)
1046

    
1047
    cluster_info = cfg.GetClusterInfo()
1048
    new_master_node = cfg.GetNodeInfoByName(new_master)
1049
    if new_master_node is None:
1050
      raise errors.OpPrereqError("Could not find new master node '%s' in"
1051
                                 " cluster configuration." % new_master,
1052
                                 errors.ECODE_NOENT)
1053

    
1054
    cluster_info.master_node = new_master_node.uuid
1055
    # this will also regenerate the ssconf files, since we updated the
1056
    # cluster info
1057
    cfg.Update(cluster_info, logging.error)
1058

    
1059
    # if cfg.Update worked, then it means the old master daemon won't be
1060
    # able now to write its own config file (we rely on locking in both
1061
    # backend.UploadFile() and ConfigWriter._Write(); hence the next
1062
    # step is to kill the old master
1063

    
1064
    logging.info("Stopping the master daemon on node %s", old_master)
1065

    
1066
    runner = rpc.BootstrapRunner()
1067
    master_params = cfg.GetMasterNetworkParameters()
1068
    master_params.uuid = old_master_node.uuid
1069
    ems = cfg.GetUseExternalMipScript()
1070
    result = runner.call_node_deactivate_master_ip(old_master,
1071
                                                   master_params, ems)
1072

    
1073
    msg = result.fail_msg
1074
    if msg:
1075
      logging.warning("Could not disable the master IP: %s", msg)
1076

    
1077
    result = runner.call_node_stop_master(old_master)
1078
    msg = result.fail_msg
1079
    if msg:
1080
      logging.error("Could not disable the master role on the old master"
1081
                    " %s, please disable manually: %s", old_master, msg)
1082
  except errors.ConfigurationError, err:
1083
    logging.error("Error while trying to set the new master: %s",
1084
                  str(err))
1085
    return 1
1086
  finally:
1087
    # stop WConfd again:
1088
    result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.WCONFD])
1089
    if result.failed:
1090
      logging.error("Could not stop the configuration daemon,"
1091
                    " command %s had exitcode %s and error %s",
1092
                    result.cmd, result.exit_code, result.output)
1093

    
1094
  logging.info("Checking master IP non-reachability...")
1095

    
1096
  master_ip = sstore.GetMasterIP()
1097
  total_timeout = 30
1098

    
1099
  # Here we have a phase where no master should be running
1100
  def _check_ip():
1101
    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
1102
      raise utils.RetryAgain()
1103

    
1104
  try:
1105
    utils.Retry(_check_ip, (1, 1.5, 5), total_timeout)
1106
  except utils.RetryTimeout:
1107
    logging.warning("The master IP is still reachable after %s seconds,"
1108
                    " continuing but activating the master on the current"
1109
                    " node will probably fail", total_timeout)
1110

    
1111
  if jstore.CheckDrainFlag():
1112
    logging.info("Undraining job queue")
1113
    jstore.SetDrainFlag(False)
1114

    
1115
  logging.info("Starting the master daemons on the new master")
1116

    
1117
  result = rpc.BootstrapRunner().call_node_start_master_daemons(new_master,
1118
                                                                no_voting)
1119
  msg = result.fail_msg
1120
  if msg:
1121
    logging.error("Could not start the master role on the new master"
1122
                  " %s, please check: %s", new_master, msg)
1123
    rcode = 1
1124

    
1125
  logging.info("Master failed over from %s to %s", old_master, new_master)
1126
  return rcode
1127

    
1128

    
1129
def GetMaster():
1130
  """Returns the current master node.
1131

1132
  This is a separate function in bootstrap since it's needed by
1133
  gnt-cluster, and instead of importing directly ssconf, it's better
1134
  to abstract it in bootstrap, where we do use ssconf in other
1135
  functions too.
1136

1137
  """
1138
  sstore = ssconf.SimpleStore()
1139

    
1140
  old_master, _ = ssconf.GetMasterAndMyself(sstore)
1141

    
1142
  return old_master
1143

    
1144

    
1145
def GatherMasterVotes(node_names):
1146
  """Check the agreement on who is the master.
1147

1148
  This function will return a list of (node, number of votes), ordered
1149
  by the number of votes. Errors will be denoted by the key 'None'.
1150

1151
  Note that the sum of votes is the number of nodes this machine
1152
  knows, whereas the number of entries in the list could be different
1153
  (if some nodes vote for another master).
1154

1155
  We remove ourselves from the list since we know that (bugs aside)
1156
  since we use the same source for configuration information for both
1157
  backend and boostrap, we'll always vote for ourselves.
1158

1159
  @type node_names: list
1160
  @param node_names: the list of nodes to query for master info; the current
1161
      node will be removed if it is in the list
1162
  @rtype: list
1163
  @return: list of (node, votes)
1164

1165
  """
1166
  myself = netutils.Hostname.GetSysName()
1167
  try:
1168
    node_names.remove(myself)
1169
  except ValueError:
1170
    pass
1171
  if not node_names:
1172
    # no nodes left (eventually after removing myself)
1173
    return []
1174
  results = rpc.BootstrapRunner().call_master_node_name(node_names)
1175
  if not isinstance(results, dict):
1176
    # this should not happen (unless internal error in rpc)
1177
    logging.critical("Can't complete rpc call, aborting master startup")
1178
    return [(None, len(node_names))]
1179
  votes = {}
1180
  for node_name in results:
1181
    nres = results[node_name]
1182
    msg = nres.fail_msg
1183

    
1184
    if msg:
1185
      logging.warning("Error contacting node %s: %s", node_name, msg)
1186
      node = None
1187
    else:
1188
      node = nres.payload
1189

    
1190
    if node not in votes:
1191
      votes[node] = 1
1192
    else:
1193
      votes[node] += 1
1194

    
1195
  vote_list = [v for v in votes.items()]
1196
  # sort first on number of votes then on name, since we want None
1197
  # sorted later if we have the half of the nodes not responding, and
1198
  # half voting all for the same master
1199
  vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
1200

    
1201
  return vote_list