Statistics
| Branch: | Tag: | Revision:

root / lib / bootstrap.py @ 7228ca91

History | View | Annotate | Download (33 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions to bootstrap a new cluster.
23

24
"""
25

    
26
import os
27
import os.path
28
import re
29
import logging
30
import time
31

    
32
from ganeti import rpc
33
from ganeti import ssh
34
from ganeti import utils
35
from ganeti import errors
36
from ganeti import config
37
from ganeti import constants
38
from ganeti import objects
39
from ganeti import ssconf
40
from ganeti import serializer
41
from ganeti import hypervisor
42
from ganeti import bdev
43
from ganeti import netutils
44
from ganeti import backend
45
from ganeti import luxi
46
from ganeti import jstore
47

    
48

    
49
# ec_id for InitConfig's temporary reservation manager
50
_INITCONF_ECID = "initconfig-ecid"
51

    
52
#: After how many seconds daemon must be responsive
53
_DAEMON_READY_TIMEOUT = 10.0
54

    
55

    
56
def _InitSSHSetup():
57
  """Setup the SSH configuration for the cluster.
58

59
  This generates a dsa keypair for root, adds the pub key to the
60
  permitted hosts and adds the hostkey to its own known hosts.
61

62
  """
63
  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
64

    
65
  for name in priv_key, pub_key:
66
    if os.path.exists(name):
67
      utils.CreateBackup(name)
68
    utils.RemoveFile(name)
69

    
70
  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
71
                         "-f", priv_key,
72
                         "-q", "-N", ""])
73
  if result.failed:
74
    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
75
                             result.output)
76

    
77
  utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
78

    
79

    
80
def GenerateHmacKey(file_name):
81
  """Writes a new HMAC key.
82

83
  @type file_name: str
84
  @param file_name: Path to output file
85

86
  """
87
  utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
88
                  backup=True)
89

    
90

    
91
def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_spice_cert,
92
                          new_confd_hmac_key, new_cds,
93
                          rapi_cert_pem=None, spice_cert_pem=None,
94
                          spice_cacert_pem=None, cds=None,
95
                          nodecert_file=constants.NODED_CERT_FILE,
96
                          rapicert_file=constants.RAPI_CERT_FILE,
97
                          spicecert_file=constants.SPICE_CERT_FILE,
98
                          spicecacert_file=constants.SPICE_CACERT_FILE,
99
                          hmackey_file=constants.CONFD_HMAC_KEY,
100
                          cds_file=constants.CLUSTER_DOMAIN_SECRET_FILE):
101
  """Updates the cluster certificates, keys and secrets.
102

103
  @type new_cluster_cert: bool
104
  @param new_cluster_cert: Whether to generate a new cluster certificate
105
  @type new_rapi_cert: bool
106
  @param new_rapi_cert: Whether to generate a new RAPI certificate
107
  @type new_spice_cert: bool
108
  @param new_spice_cert: Whether to generate a new SPICE certificate
109
  @type new_confd_hmac_key: bool
110
  @param new_confd_hmac_key: Whether to generate a new HMAC key
111
  @type new_cds: bool
112
  @param new_cds: Whether to generate a new cluster domain secret
113
  @type rapi_cert_pem: string
114
  @param rapi_cert_pem: New RAPI certificate in PEM format
115
  @type spice_cert_pem: string
116
  @param spice_cert_pem: New SPICE certificate in PEM format
117
  @type spice_cacert_pem: string
118
  @param spice_cacert_pem: Certificate of the CA that signed the SPICE
119
                           certificate, in PEM format
120
  @type cds: string
121
  @param cds: New cluster domain secret
122
  @type nodecert_file: string
123
  @param nodecert_file: optional override of the node cert file path
124
  @type rapicert_file: string
125
  @param rapicert_file: optional override of the rapi cert file path
126
  @type spicecert_file: string
127
  @param spicecert_file: optional override of the spice cert file path
128
  @type spicecacert_file: string
129
  @param spicecacert_file: optional override of the spice CA cert file path
130
  @type hmackey_file: string
131
  @param hmackey_file: optional override of the hmac key file path
132

133
  """
134
  # noded SSL certificate
135
  cluster_cert_exists = os.path.exists(nodecert_file)
136
  if new_cluster_cert or not cluster_cert_exists:
137
    if cluster_cert_exists:
138
      utils.CreateBackup(nodecert_file)
139

    
140
    logging.debug("Generating new cluster certificate at %s", nodecert_file)
141
    utils.GenerateSelfSignedSslCert(nodecert_file)
142

    
143
  # confd HMAC key
144
  if new_confd_hmac_key or not os.path.exists(hmackey_file):
145
    logging.debug("Writing new confd HMAC key to %s", hmackey_file)
146
    GenerateHmacKey(hmackey_file)
147

    
148
  # RAPI
149
  rapi_cert_exists = os.path.exists(rapicert_file)
150

    
151
  if rapi_cert_pem:
152
    # Assume rapi_pem contains a valid PEM-formatted certificate and key
153
    logging.debug("Writing RAPI certificate at %s", rapicert_file)
154
    utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True)
155

    
156
  elif new_rapi_cert or not rapi_cert_exists:
157
    if rapi_cert_exists:
158
      utils.CreateBackup(rapicert_file)
159

    
160
    logging.debug("Generating new RAPI certificate at %s", rapicert_file)
161
    utils.GenerateSelfSignedSslCert(rapicert_file)
162

    
163
  # SPICE
164
  spice_cert_exists = os.path.exists(spicecert_file)
165
  spice_cacert_exists = os.path.exists(spicecacert_file)
166
  if spice_cert_pem:
167
    # spice_cert_pem implies also spice_cacert_pem
168
    logging.debug("Writing SPICE certificate at %s", spicecert_file)
169
    utils.WriteFile(spicecert_file, data=spice_cert_pem, backup=True)
170
    logging.debug("Writing SPICE CA certificate at %s", spicecacert_file)
171
    utils.WriteFile(spicecacert_file, data=spice_cacert_pem, backup=True)
172
  elif new_spice_cert or not spice_cert_exists:
173
    if spice_cert_exists:
174
      utils.CreateBackup(spicecert_file)
175
    if spice_cacert_exists:
176
      utils.CreateBackup(spicecacert_file)
177

    
178
    logging.debug("Generating new self-signed SPICE certificate at %s",
179
                  spicecert_file)
180
    (_, cert_pem) = utils.GenerateSelfSignedSslCert(spicecert_file)
181

    
182
    # Self-signed certificate -> the public certificate is also the CA public
183
    # certificate
184
    logging.debug("Writing the public certificate to %s",
185
                  spicecert_file)
186
    utils.io.WriteFile(spicecacert_file, mode=0400, data=cert_pem)
187

    
188
  # Cluster domain secret
189
  if cds:
190
    logging.debug("Writing cluster domain secret to %s", cds_file)
191
    utils.WriteFile(cds_file, data=cds, backup=True)
192

    
193
  elif new_cds or not os.path.exists(cds_file):
194
    logging.debug("Generating new cluster domain secret at %s", cds_file)
195
    GenerateHmacKey(cds_file)
196

    
197

    
198
def _InitGanetiServerSetup(master_name):
199
  """Setup the necessary configuration for the initial node daemon.
200

201
  This creates the nodepass file containing the shared password for
202
  the cluster, generates the SSL certificate and starts the node daemon.
203

204
  @type master_name: str
205
  @param master_name: Name of the master node
206

207
  """
208
  # Generate cluster secrets
209
  GenerateClusterCrypto(True, False, False, False, False)
210

    
211
  result = utils.RunCmd([constants.DAEMON_UTIL, "start", constants.NODED])
212
  if result.failed:
213
    raise errors.OpExecError("Could not start the node daemon, command %s"
214
                             " had exitcode %s and error %s" %
215
                             (result.cmd, result.exit_code, result.output))
216

    
217
  _WaitForNodeDaemon(master_name)
218

    
219

    
220
def _WaitForNodeDaemon(node_name):
221
  """Wait for node daemon to become responsive.
222

223
  """
224
  def _CheckNodeDaemon():
225
    # Pylint bug <http://www.logilab.org/ticket/35642>
226
    # pylint: disable=E1101
227
    result = rpc.BootstrapRunner().call_version([node_name])[node_name]
228
    if result.fail_msg:
229
      raise utils.RetryAgain()
230

    
231
  try:
232
    utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
233
  except utils.RetryTimeout:
234
    raise errors.OpExecError("Node daemon on %s didn't answer queries within"
235
                             " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
236

    
237

    
238
def _WaitForMasterDaemon():
239
  """Wait for master daemon to become responsive.
240

241
  """
242
  def _CheckMasterDaemon():
243
    try:
244
      cl = luxi.Client()
245
      (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
246
    except Exception:
247
      raise utils.RetryAgain()
248

    
249
    logging.debug("Received cluster name %s from master", cluster_name)
250

    
251
  try:
252
    utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
253
  except utils.RetryTimeout:
254
    raise errors.OpExecError("Master daemon didn't answer queries within"
255
                             " %s seconds" % _DAEMON_READY_TIMEOUT)
256

    
257

    
258
def _InitFileStorage(file_storage_dir):
259
  """Initialize if needed the file storage.
260

261
  @param file_storage_dir: the user-supplied value
262
  @return: either empty string (if file storage was disabled at build
263
      time) or the normalized path to the storage directory
264

265
  """
266
  file_storage_dir = os.path.normpath(file_storage_dir)
267

    
268
  if not os.path.isabs(file_storage_dir):
269
    raise errors.OpPrereqError("File storage directory '%s' is not an absolute"
270
                               " path" % file_storage_dir, errors.ECODE_INVAL)
271

    
272
  if not os.path.exists(file_storage_dir):
273
    try:
274
      os.makedirs(file_storage_dir, 0750)
275
    except OSError, err:
276
      raise errors.OpPrereqError("Cannot create file storage directory"
277
                                 " '%s': %s" % (file_storage_dir, err),
278
                                 errors.ECODE_ENVIRON)
279

    
280
  if not os.path.isdir(file_storage_dir):
281
    raise errors.OpPrereqError("The file storage directory '%s' is not"
282
                               " a directory." % file_storage_dir,
283
                               errors.ECODE_ENVIRON)
284
  return file_storage_dir
285

    
286

    
287
def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914
288
                master_netmask, master_netdev, file_storage_dir,
289
                shared_file_storage_dir, candidate_pool_size, secondary_ip=None,
290
                vg_name=None, beparams=None, nicparams=None, ndparams=None,
291
                hvparams=None, diskparams=None, enabled_hypervisors=None,
292
                modify_etc_hosts=True, modify_ssh_setup=True,
293
                maintain_node_health=False, drbd_helper=None, uid_pool=None,
294
                default_iallocator=None, primary_ip_version=None, ipolicy=None,
295
                prealloc_wipe_disks=False, use_external_mip_script=False,
296
                hv_state=None, disk_state=None):
297
  """Initialise the cluster.
298

299
  @type candidate_pool_size: int
300
  @param candidate_pool_size: master candidate pool size
301

302
  """
303
  # TODO: complete the docstring
304
  if config.ConfigWriter.IsCluster():
305
    raise errors.OpPrereqError("Cluster is already initialised",
306
                               errors.ECODE_STATE)
307

    
308
  if not enabled_hypervisors:
309
    raise errors.OpPrereqError("Enabled hypervisors list must contain at"
310
                               " least one member", errors.ECODE_INVAL)
311
  invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
312
  if invalid_hvs:
313
    raise errors.OpPrereqError("Enabled hypervisors contains invalid"
314
                               " entries: %s" % invalid_hvs,
315
                               errors.ECODE_INVAL)
316

    
317
  try:
318
    ipcls = netutils.IPAddress.GetClassFromIpVersion(primary_ip_version)
319
  except errors.ProgrammerError:
320
    raise errors.OpPrereqError("Invalid primary ip version: %d." %
321
                               primary_ip_version)
322

    
323
  hostname = netutils.GetHostname(family=ipcls.family)
324
  if not ipcls.IsValid(hostname.ip):
325
    raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
326
                               " address." % (hostname.ip, primary_ip_version))
327

    
328
  if ipcls.IsLoopback(hostname.ip):
329
    raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
330
                               " address. Please fix DNS or %s." %
331
                               (hostname.ip, constants.ETC_HOSTS),
332
                               errors.ECODE_ENVIRON)
333

    
334
  if not ipcls.Own(hostname.ip):
335
    raise errors.OpPrereqError("Inconsistency: this host's name resolves"
336
                               " to %s,\nbut this ip address does not"
337
                               " belong to this host" %
338
                               hostname.ip, errors.ECODE_ENVIRON)
339

    
340
  clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
341

    
342
  if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
343
    raise errors.OpPrereqError("Cluster IP already active",
344
                               errors.ECODE_NOTUNIQUE)
345

    
346
  if not secondary_ip:
347
    if primary_ip_version == constants.IP6_VERSION:
348
      raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
349
                                 " IPv4 address must be given as secondary",
350
                                 errors.ECODE_INVAL)
351
    secondary_ip = hostname.ip
352

    
353
  if not netutils.IP4Address.IsValid(secondary_ip):
354
    raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
355
                               " IPv4 address." % secondary_ip,
356
                               errors.ECODE_INVAL)
357

    
358
  if not netutils.IP4Address.Own(secondary_ip):
359
    raise errors.OpPrereqError("You gave %s as secondary IP,"
360
                               " but it does not belong to this host." %
361
                               secondary_ip, errors.ECODE_ENVIRON)
362

    
363
  if master_netmask is not None:
364
    if not ipcls.ValidateNetmask(master_netmask):
365
      raise errors.OpPrereqError("CIDR netmask (%s) not valid for IPv%s " %
366
                                  (master_netmask, primary_ip_version))
367
  else:
368
    master_netmask = ipcls.iplen
369

    
370
  if vg_name is not None:
371
    # Check if volume group is valid
372
    vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
373
                                          constants.MIN_VG_SIZE)
374
    if vgstatus:
375
      raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
376
                                 " you are not using lvm" % vgstatus,
377
                                 errors.ECODE_INVAL)
378

    
379
  if drbd_helper is not None:
380
    try:
381
      curr_helper = bdev.BaseDRBD.GetUsermodeHelper()
382
    except errors.BlockDeviceError, err:
383
      raise errors.OpPrereqError("Error while checking drbd helper"
384
                                 " (specify --no-drbd-storage if you are not"
385
                                 " using drbd): %s" % str(err),
386
                                 errors.ECODE_ENVIRON)
387
    if drbd_helper != curr_helper:
388
      raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s"
389
                                 " is the current helper" % (drbd_helper,
390
                                                             curr_helper),
391
                                 errors.ECODE_INVAL)
392

    
393
  if constants.ENABLE_FILE_STORAGE:
394
    file_storage_dir = _InitFileStorage(file_storage_dir)
395
  else:
396
    file_storage_dir = ""
397

    
398
  if constants.ENABLE_SHARED_FILE_STORAGE:
399
    shared_file_storage_dir = _InitFileStorage(shared_file_storage_dir)
400
  else:
401
    shared_file_storage_dir = ""
402

    
403
  if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
404
    raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
405
                               errors.ECODE_INVAL)
406

    
407
  result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
408
  if result.failed:
409
    raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
410
                               (master_netdev,
411
                                result.output.strip()), errors.ECODE_INVAL)
412

    
413
  dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE)]
414
  utils.EnsureDirs(dirs)
415

    
416
  objects.UpgradeBeParams(beparams)
417
  utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
418
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
419

    
420
  objects.NIC.CheckParameterSyntax(nicparams)
421

    
422
  full_ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy)
423

    
424
  if ndparams is not None:
425
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
426
  else:
427
    ndparams = dict(constants.NDC_DEFAULTS)
428

    
429
  # This is ugly, as we modify the dict itself
430
  # FIXME: Make utils.ForceDictType pure functional or write a wrapper
431
  # around it
432
  if hv_state:
433
    for hvname, hvs_data in hv_state.items():
434
      utils.ForceDictType(hvs_data, constants.HVSTS_PARAMETER_TYPES)
435
      hv_state[hvname] = objects.Cluster.SimpleFillHvState(hvs_data)
436
  else:
437
    hv_state = dict((hvname, constants.HVST_DEFAULTS)
438
                    for hvname in enabled_hypervisors)
439

    
440
  # FIXME: disk_state has no default values yet
441
  if disk_state:
442
    for storage, ds_data in disk_state.items():
443
      if storage not in constants.DS_VALID_TYPES:
444
        raise errors.OpPrereqError("Invalid storage type in disk state: %s" %
445
                                   storage, errors.ECODE_INVAL)
446
      for ds_name, state in ds_data.items():
447
        utils.ForceDictType(state, constants.DSS_PARAMETER_TYPES)
448
        ds_data[ds_name] = objects.Cluster.SimpleFillDiskState(state)
449

    
450
  # hvparams is a mapping of hypervisor->hvparams dict
451
  for hv_name, hv_params in hvparams.iteritems():
452
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
453
    hv_class = hypervisor.GetHypervisor(hv_name)
454
    hv_class.CheckParameterSyntax(hv_params)
455

    
456
  # diskparams is a mapping of disk-template->diskparams dict
457
  for template, dt_params in diskparams.items():
458
    param_keys = set(dt_params.keys())
459
    default_param_keys = set(constants.DISK_DT_DEFAULTS[template].keys())
460
    if not (param_keys <= default_param_keys):
461
      unknown_params = param_keys - default_param_keys
462
      raise errors.OpPrereqError("Invalid parameters for disk template %s:"
463
                                 " %s" % (template,
464
                                          utils.CommaJoin(unknown_params)))
465
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
466

    
467
  # set up ssh config and /etc/hosts
468
  sshline = utils.ReadFile(constants.SSH_HOST_RSA_PUB)
469
  sshkey = sshline.split(" ")[1]
470

    
471
  if modify_etc_hosts:
472
    utils.AddHostToEtcHosts(hostname.name, hostname.ip)
473

    
474
  if modify_ssh_setup:
475
    _InitSSHSetup()
476

    
477
  if default_iallocator is not None:
478
    alloc_script = utils.FindFile(default_iallocator,
479
                                  constants.IALLOCATOR_SEARCH_PATH,
480
                                  os.path.isfile)
481
    if alloc_script is None:
482
      raise errors.OpPrereqError("Invalid default iallocator script '%s'"
483
                                 " specified" % default_iallocator,
484
                                 errors.ECODE_INVAL)
485
  elif constants.HTOOLS:
486
    # htools was enabled at build-time, we default to it
487
    if utils.FindFile(constants.IALLOC_HAIL,
488
                      constants.IALLOCATOR_SEARCH_PATH,
489
                      os.path.isfile):
490
      default_iallocator = constants.IALLOC_HAIL
491

    
492
  now = time.time()
493

    
494
  # init of cluster config file
495
  cluster_config = objects.Cluster(
496
    serial_no=1,
497
    rsahostkeypub=sshkey,
498
    highest_used_port=(constants.FIRST_DRBD_PORT - 1),
499
    mac_prefix=mac_prefix,
500
    volume_group_name=vg_name,
501
    tcpudp_port_pool=set(),
502
    master_node=hostname.name,
503
    master_ip=clustername.ip,
504
    master_netmask=master_netmask,
505
    master_netdev=master_netdev,
506
    cluster_name=clustername.name,
507
    file_storage_dir=file_storage_dir,
508
    shared_file_storage_dir=shared_file_storage_dir,
509
    enabled_hypervisors=enabled_hypervisors,
510
    beparams={constants.PP_DEFAULT: beparams},
511
    nicparams={constants.PP_DEFAULT: nicparams},
512
    ndparams=ndparams,
513
    hvparams=hvparams,
514
    diskparams=diskparams,
515
    candidate_pool_size=candidate_pool_size,
516
    modify_etc_hosts=modify_etc_hosts,
517
    modify_ssh_setup=modify_ssh_setup,
518
    uid_pool=uid_pool,
519
    ctime=now,
520
    mtime=now,
521
    maintain_node_health=maintain_node_health,
522
    drbd_usermode_helper=drbd_helper,
523
    default_iallocator=default_iallocator,
524
    primary_ip_family=ipcls.family,
525
    prealloc_wipe_disks=prealloc_wipe_disks,
526
    use_external_mip_script=use_external_mip_script,
527
    ipolicy=full_ipolicy,
528
    hv_state_static=hv_state,
529
    disk_state_static=disk_state,
530
    )
531
  master_node_config = objects.Node(name=hostname.name,
532
                                    primary_ip=hostname.ip,
533
                                    secondary_ip=secondary_ip,
534
                                    serial_no=1,
535
                                    master_candidate=True,
536
                                    offline=False, drained=False,
537
                                    ctime=now, mtime=now,
538
                                    )
539
  InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
540
  cfg = config.ConfigWriter(offline=True)
541
  ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
542
  cfg.Update(cfg.GetClusterInfo(), logging.error)
543
  backend.WriteSsconfFiles(cfg.GetSsconfValues())
544

    
545
  # set up the inter-node password and certificate
546
  _InitGanetiServerSetup(hostname.name)
547

    
548
  logging.debug("Starting daemons")
549
  result = utils.RunCmd([constants.DAEMON_UTIL, "start-all"])
550
  if result.failed:
551
    raise errors.OpExecError("Could not start daemons, command %s"
552
                             " had exitcode %s and error %s" %
553
                             (result.cmd, result.exit_code, result.output))
554

    
555
  _WaitForMasterDaemon()
556

    
557

    
558
def InitConfig(version, cluster_config, master_node_config,
559
               cfg_file=constants.CLUSTER_CONF_FILE):
560
  """Create the initial cluster configuration.
561

562
  It will contain the current node, which will also be the master
563
  node, and no instances.
564

565
  @type version: int
566
  @param version: configuration version
567
  @type cluster_config: L{objects.Cluster}
568
  @param cluster_config: cluster configuration
569
  @type master_node_config: L{objects.Node}
570
  @param master_node_config: master node configuration
571
  @type cfg_file: string
572
  @param cfg_file: configuration file path
573

574
  """
575
  uuid_generator = config.TemporaryReservationManager()
576
  cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
577
                                                _INITCONF_ECID)
578
  master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
579
                                                    _INITCONF_ECID)
580
  nodes = {
581
    master_node_config.name: master_node_config,
582
    }
583
  default_nodegroup = objects.NodeGroup(
584
    uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
585
    name=constants.INITIAL_NODE_GROUP_NAME,
586
    members=[master_node_config.name],
587
    diskparams={},
588
    )
589
  nodegroups = {
590
    default_nodegroup.uuid: default_nodegroup,
591
    }
592
  now = time.time()
593
  config_data = objects.ConfigData(version=version,
594
                                   cluster=cluster_config,
595
                                   nodegroups=nodegroups,
596
                                   nodes=nodes,
597
                                   instances={},
598
                                   serial_no=1,
599
                                   ctime=now, mtime=now)
600
  utils.WriteFile(cfg_file,
601
                  data=serializer.Dump(config_data.ToDict()),
602
                  mode=0600)
603

    
604

    
605
def FinalizeClusterDestroy(master):
606
  """Execute the last steps of cluster destroy
607

608
  This function shuts down all the daemons, completing the destroy
609
  begun in cmdlib.LUDestroyOpcode.
610

611
  """
612
  cfg = config.ConfigWriter()
613
  modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
614
  runner = rpc.BootstrapRunner()
615

    
616
  master_params = cfg.GetMasterNetworkParameters()
617
  master_params.name = master
618
  ems = cfg.GetUseExternalMipScript()
619
  result = runner.call_node_deactivate_master_ip(master_params.name,
620
                                                 master_params, ems)
621

    
622
  msg = result.fail_msg
623
  if msg:
624
    logging.warning("Could not disable the master IP: %s", msg)
625

    
626
  result = runner.call_node_stop_master(master)
627
  msg = result.fail_msg
628
  if msg:
629
    logging.warning("Could not disable the master role: %s", msg)
630

    
631
  result = runner.call_node_leave_cluster(master, modify_ssh_setup)
632
  msg = result.fail_msg
633
  if msg:
634
    logging.warning("Could not shutdown the node daemon and cleanup"
635
                    " the node: %s", msg)
636

    
637

    
638
def SetupNodeDaemon(cluster_name, node, ssh_key_check):
639
  """Add a node to the cluster.
640

641
  This function must be called before the actual opcode, and will ssh
642
  to the remote node, copy the needed files, and start ganeti-noded,
643
  allowing the master to do the rest via normal rpc calls.
644

645
  @param cluster_name: the cluster name
646
  @param node: the name of the new node
647
  @param ssh_key_check: whether to do a strict key check
648

649
  """
650
  family = ssconf.SimpleStore().GetPrimaryIPFamily()
651
  sshrunner = ssh.SshRunner(cluster_name,
652
                            ipv6=(family == netutils.IP6Address.family))
653

    
654
  bind_address = constants.IP4_ADDRESS_ANY
655
  if family == netutils.IP6Address.family:
656
    bind_address = constants.IP6_ADDRESS_ANY
657

    
658
  # set up inter-node password and certificate and restarts the node daemon
659
  # and then connect with ssh to set password and start ganeti-noded
660
  # note that all the below variables are sanitized at this point,
661
  # either by being constants or by the checks above
662
  sshrunner.CopyFileToNode(node, constants.NODED_CERT_FILE)
663
  sshrunner.CopyFileToNode(node, constants.RAPI_CERT_FILE)
664
  sshrunner.CopyFileToNode(node, constants.SPICE_CERT_FILE)
665
  sshrunner.CopyFileToNode(node, constants.SPICE_CACERT_FILE)
666
  sshrunner.CopyFileToNode(node, constants.CONFD_HMAC_KEY)
667
  mycommand = ("%s stop-all; %s start %s -b %s" %
668
               (constants.DAEMON_UTIL, constants.DAEMON_UTIL, constants.NODED,
669
                utils.ShellQuote(bind_address)))
670

    
671
  result = sshrunner.Run(node, "root", mycommand, batch=False,
672
                         ask_key=ssh_key_check,
673
                         use_cluster_key=True,
674
                         strict_host_check=ssh_key_check)
675
  if result.failed:
676
    raise errors.OpExecError("Remote command on node %s, error: %s,"
677
                             " output: %s" %
678
                             (node, result.fail_reason, result.output))
679

    
680
  _WaitForNodeDaemon(node)
681

    
682

    
683
def MasterFailover(no_voting=False):
684
  """Failover the master node.
685

686
  This checks that we are not already the master, and will cause the
687
  current master to cease being master, and the non-master to become
688
  new master.
689

690
  @type no_voting: boolean
691
  @param no_voting: force the operation without remote nodes agreement
692
                      (dangerous)
693

694
  """
695
  sstore = ssconf.SimpleStore()
696

    
697
  old_master, new_master = ssconf.GetMasterAndMyself(sstore)
698
  node_list = sstore.GetNodeList()
699
  mc_list = sstore.GetMasterCandidates()
700

    
701
  if old_master == new_master:
702
    raise errors.OpPrereqError("This commands must be run on the node"
703
                               " where you want the new master to be."
704
                               " %s is already the master" %
705
                               old_master, errors.ECODE_INVAL)
706

    
707
  if new_master not in mc_list:
708
    mc_no_master = [name for name in mc_list if name != old_master]
709
    raise errors.OpPrereqError("This node is not among the nodes marked"
710
                               " as master candidates. Only these nodes"
711
                               " can become masters. Current list of"
712
                               " master candidates is:\n"
713
                               "%s" % ("\n".join(mc_no_master)),
714
                               errors.ECODE_STATE)
715

    
716
  if not no_voting:
717
    vote_list = GatherMasterVotes(node_list)
718

    
719
    if vote_list:
720
      voted_master = vote_list[0][0]
721
      if voted_master is None:
722
        raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
723
                                   " not respond.", errors.ECODE_ENVIRON)
724
      elif voted_master != old_master:
725
        raise errors.OpPrereqError("I have a wrong configuration, I believe"
726
                                   " the master is %s but the other nodes"
727
                                   " voted %s. Please resync the configuration"
728
                                   " of this node." %
729
                                   (old_master, voted_master),
730
                                   errors.ECODE_STATE)
731
  # end checks
732

    
733
  rcode = 0
734

    
735
  logging.info("Setting master to %s, old master: %s", new_master, old_master)
736

    
737
  try:
738
    # instantiate a real config writer, as we now know we have the
739
    # configuration data
740
    cfg = config.ConfigWriter(accept_foreign=True)
741

    
742
    cluster_info = cfg.GetClusterInfo()
743
    cluster_info.master_node = new_master
744
    # this will also regenerate the ssconf files, since we updated the
745
    # cluster info
746
    cfg.Update(cluster_info, logging.error)
747
  except errors.ConfigurationError, err:
748
    logging.error("Error while trying to set the new master: %s",
749
                  str(err))
750
    return 1
751

    
752
  # if cfg.Update worked, then it means the old master daemon won't be
753
  # able now to write its own config file (we rely on locking in both
754
  # backend.UploadFile() and ConfigWriter._Write(); hence the next
755
  # step is to kill the old master
756

    
757
  logging.info("Stopping the master daemon on node %s", old_master)
758

    
759
  runner = rpc.BootstrapRunner()
760
  master_params = cfg.GetMasterNetworkParameters()
761
  master_params.name = old_master
762
  ems = cfg.GetUseExternalMipScript()
763
  result = runner.call_node_deactivate_master_ip(master_params.name,
764
                                                 master_params, ems)
765

    
766
  msg = result.fail_msg
767
  if msg:
768
    logging.warning("Could not disable the master IP: %s", msg)
769

    
770
  result = runner.call_node_stop_master(old_master)
771
  msg = result.fail_msg
772
  if msg:
773
    logging.error("Could not disable the master role on the old master"
774
                 " %s, please disable manually: %s", old_master, msg)
775

    
776
  logging.info("Checking master IP non-reachability...")
777

    
778
  master_ip = sstore.GetMasterIP()
779
  total_timeout = 30
780

    
781
  # Here we have a phase where no master should be running
782
  def _check_ip():
783
    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
784
      raise utils.RetryAgain()
785

    
786
  try:
787
    utils.Retry(_check_ip, (1, 1.5, 5), total_timeout)
788
  except utils.RetryTimeout:
789
    logging.warning("The master IP is still reachable after %s seconds,"
790
                    " continuing but activating the master on the current"
791
                    " node will probably fail", total_timeout)
792

    
793
  if jstore.CheckDrainFlag():
794
    logging.info("Undraining job queue")
795
    jstore.SetDrainFlag(False)
796

    
797
  logging.info("Starting the master daemons on the new master")
798

    
799
  result = rpc.BootstrapRunner().call_node_start_master_daemons(new_master,
800
                                                                no_voting)
801
  msg = result.fail_msg
802
  if msg:
803
    logging.error("Could not start the master role on the new master"
804
                  " %s, please check: %s", new_master, msg)
805
    rcode = 1
806

    
807
  logging.info("Master failed over from %s to %s", old_master, new_master)
808
  return rcode
809

    
810

    
811
def GetMaster():
812
  """Returns the current master node.
813

814
  This is a separate function in bootstrap since it's needed by
815
  gnt-cluster, and instead of importing directly ssconf, it's better
816
  to abstract it in bootstrap, where we do use ssconf in other
817
  functions too.
818

819
  """
820
  sstore = ssconf.SimpleStore()
821

    
822
  old_master, _ = ssconf.GetMasterAndMyself(sstore)
823

    
824
  return old_master
825

    
826

    
827
def GatherMasterVotes(node_list):
828
  """Check the agreement on who is the master.
829

830
  This function will return a list of (node, number of votes), ordered
831
  by the number of votes. Errors will be denoted by the key 'None'.
832

833
  Note that the sum of votes is the number of nodes this machine
834
  knows, whereas the number of entries in the list could be different
835
  (if some nodes vote for another master).
836

837
  We remove ourselves from the list since we know that (bugs aside)
838
  since we use the same source for configuration information for both
839
  backend and boostrap, we'll always vote for ourselves.
840

841
  @type node_list: list
842
  @param node_list: the list of nodes to query for master info; the current
843
      node will be removed if it is in the list
844
  @rtype: list
845
  @return: list of (node, votes)
846

847
  """
848
  myself = netutils.Hostname.GetSysName()
849
  try:
850
    node_list.remove(myself)
851
  except ValueError:
852
    pass
853
  if not node_list:
854
    # no nodes left (eventually after removing myself)
855
    return []
856
  results = rpc.BootstrapRunner().call_master_info(node_list)
857
  if not isinstance(results, dict):
858
    # this should not happen (unless internal error in rpc)
859
    logging.critical("Can't complete rpc call, aborting master startup")
860
    return [(None, len(node_list))]
861
  votes = {}
862
  for node in results:
863
    nres = results[node]
864
    data = nres.payload
865
    msg = nres.fail_msg
866
    fail = False
867
    if msg:
868
      logging.warning("Error contacting node %s: %s", node, msg)
869
      fail = True
870
    # for now we accept both length 3, 4 and 5 (data[3] is primary ip version
871
    # and data[4] is the master netmask)
872
    elif not isinstance(data, (tuple, list)) or len(data) < 3:
873
      logging.warning("Invalid data received from node %s: %s", node, data)
874
      fail = True
875
    if fail:
876
      if None not in votes:
877
        votes[None] = 0
878
      votes[None] += 1
879
      continue
880
    master_node = data[2]
881
    if master_node not in votes:
882
      votes[master_node] = 0
883
    votes[master_node] += 1
884

    
885
  vote_list = [v for v in votes.items()]
886
  # sort first on number of votes then on name, since we want None
887
  # sorted later if we have the half of the nodes not responding, and
888
  # half voting all for the same master
889
  vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
890

    
891
  return vote_list