Factorize running node setup command
[ganeti-local] / lib / bootstrap.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions to bootstrap a new cluster.
23
24 """
25
26 import os
27 import os.path
28 import re
29 import logging
30 import time
31 import tempfile
32
33 from ganeti import rpc
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import config
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import ssconf
41 from ganeti import serializer
42 from ganeti import hypervisor
43 from ganeti import bdev
44 from ganeti import netutils
45 from ganeti import luxi
46 from ganeti import jstore
47 from ganeti import pathutils
48
49
50 # ec_id for InitConfig's temporary reservation manager
51 _INITCONF_ECID = "initconfig-ecid"
52
53 #: After how many seconds daemon must be responsive
54 _DAEMON_READY_TIMEOUT = 10.0
55
56
57 def _InitSSHSetup():
58   """Setup the SSH configuration for the cluster.
59
60   This generates a dsa keypair for root, adds the pub key to the
61   permitted hosts and adds the hostkey to its own known hosts.
62
63   """
64   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
65
66   for name in priv_key, pub_key:
67     if os.path.exists(name):
68       utils.CreateBackup(name)
69     utils.RemoveFile(name)
70
71   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
72                          "-f", priv_key,
73                          "-q", "-N", ""])
74   if result.failed:
75     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
76                              result.output)
77
78   utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
79
80
81 def GenerateHmacKey(file_name):
82   """Writes a new HMAC key.
83
84   @type file_name: str
85   @param file_name: Path to output file
86
87   """
88   utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
89                   backup=True)
90
91
92 def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_spice_cert,
93                           new_confd_hmac_key, new_cds,
94                           rapi_cert_pem=None, spice_cert_pem=None,
95                           spice_cacert_pem=None, cds=None,
96                           nodecert_file=pathutils.NODED_CERT_FILE,
97                           rapicert_file=pathutils.RAPI_CERT_FILE,
98                           spicecert_file=pathutils.SPICE_CERT_FILE,
99                           spicecacert_file=pathutils.SPICE_CACERT_FILE,
100                           hmackey_file=pathutils.CONFD_HMAC_KEY,
101                           cds_file=pathutils.CLUSTER_DOMAIN_SECRET_FILE):
102   """Updates the cluster certificates, keys and secrets.
103
104   @type new_cluster_cert: bool
105   @param new_cluster_cert: Whether to generate a new cluster certificate
106   @type new_rapi_cert: bool
107   @param new_rapi_cert: Whether to generate a new RAPI certificate
108   @type new_spice_cert: bool
109   @param new_spice_cert: Whether to generate a new SPICE certificate
110   @type new_confd_hmac_key: bool
111   @param new_confd_hmac_key: Whether to generate a new HMAC key
112   @type new_cds: bool
113   @param new_cds: Whether to generate a new cluster domain secret
114   @type rapi_cert_pem: string
115   @param rapi_cert_pem: New RAPI certificate in PEM format
116   @type spice_cert_pem: string
117   @param spice_cert_pem: New SPICE certificate in PEM format
118   @type spice_cacert_pem: string
119   @param spice_cacert_pem: Certificate of the CA that signed the SPICE
120                            certificate, in PEM format
121   @type cds: string
122   @param cds: New cluster domain secret
123   @type nodecert_file: string
124   @param nodecert_file: optional override of the node cert file path
125   @type rapicert_file: string
126   @param rapicert_file: optional override of the rapi cert file path
127   @type spicecert_file: string
128   @param spicecert_file: optional override of the spice cert file path
129   @type spicecacert_file: string
130   @param spicecacert_file: optional override of the spice CA cert file path
131   @type hmackey_file: string
132   @param hmackey_file: optional override of the hmac key file path
133
134   """
135   # noded SSL certificate
136   cluster_cert_exists = os.path.exists(nodecert_file)
137   if new_cluster_cert or not cluster_cert_exists:
138     if cluster_cert_exists:
139       utils.CreateBackup(nodecert_file)
140
141     logging.debug("Generating new cluster certificate at %s", nodecert_file)
142     utils.GenerateSelfSignedSslCert(nodecert_file)
143
144   # confd HMAC key
145   if new_confd_hmac_key or not os.path.exists(hmackey_file):
146     logging.debug("Writing new confd HMAC key to %s", hmackey_file)
147     GenerateHmacKey(hmackey_file)
148
149   # RAPI
150   rapi_cert_exists = os.path.exists(rapicert_file)
151
152   if rapi_cert_pem:
153     # Assume rapi_pem contains a valid PEM-formatted certificate and key
154     logging.debug("Writing RAPI certificate at %s", rapicert_file)
155     utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True)
156
157   elif new_rapi_cert or not rapi_cert_exists:
158     if rapi_cert_exists:
159       utils.CreateBackup(rapicert_file)
160
161     logging.debug("Generating new RAPI certificate at %s", rapicert_file)
162     utils.GenerateSelfSignedSslCert(rapicert_file)
163
164   # SPICE
165   spice_cert_exists = os.path.exists(spicecert_file)
166   spice_cacert_exists = os.path.exists(spicecacert_file)
167   if spice_cert_pem:
168     # spice_cert_pem implies also spice_cacert_pem
169     logging.debug("Writing SPICE certificate at %s", spicecert_file)
170     utils.WriteFile(spicecert_file, data=spice_cert_pem, backup=True)
171     logging.debug("Writing SPICE CA certificate at %s", spicecacert_file)
172     utils.WriteFile(spicecacert_file, data=spice_cacert_pem, backup=True)
173   elif new_spice_cert or not spice_cert_exists:
174     if spice_cert_exists:
175       utils.CreateBackup(spicecert_file)
176     if spice_cacert_exists:
177       utils.CreateBackup(spicecacert_file)
178
179     logging.debug("Generating new self-signed SPICE certificate at %s",
180                   spicecert_file)
181     (_, cert_pem) = utils.GenerateSelfSignedSslCert(spicecert_file)
182
183     # Self-signed certificate -> the public certificate is also the CA public
184     # certificate
185     logging.debug("Writing the public certificate to %s",
186                   spicecert_file)
187     utils.io.WriteFile(spicecacert_file, mode=0400, data=cert_pem)
188
189   # Cluster domain secret
190   if cds:
191     logging.debug("Writing cluster domain secret to %s", cds_file)
192     utils.WriteFile(cds_file, data=cds, backup=True)
193
194   elif new_cds or not os.path.exists(cds_file):
195     logging.debug("Generating new cluster domain secret at %s", cds_file)
196     GenerateHmacKey(cds_file)
197
198
199 def _InitGanetiServerSetup(master_name):
200   """Setup the necessary configuration for the initial node daemon.
201
202   This creates the nodepass file containing the shared password for
203   the cluster, generates the SSL certificate and starts the node daemon.
204
205   @type master_name: str
206   @param master_name: Name of the master node
207
208   """
209   # Generate cluster secrets
210   GenerateClusterCrypto(True, False, False, False, False)
211
212   result = utils.RunCmd([pathutils.DAEMON_UTIL, "start", constants.NODED])
213   if result.failed:
214     raise errors.OpExecError("Could not start the node daemon, command %s"
215                              " had exitcode %s and error %s" %
216                              (result.cmd, result.exit_code, result.output))
217
218   _WaitForNodeDaemon(master_name)
219
220
221 def _WaitForNodeDaemon(node_name):
222   """Wait for node daemon to become responsive.
223
224   """
225   def _CheckNodeDaemon():
226     # Pylint bug <http://www.logilab.org/ticket/35642>
227     # pylint: disable=E1101
228     result = rpc.BootstrapRunner().call_version([node_name])[node_name]
229     if result.fail_msg:
230       raise utils.RetryAgain()
231
232   try:
233     utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
234   except utils.RetryTimeout:
235     raise errors.OpExecError("Node daemon on %s didn't answer queries within"
236                              " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
237
238
239 def _WaitForMasterDaemon():
240   """Wait for master daemon to become responsive.
241
242   """
243   def _CheckMasterDaemon():
244     try:
245       cl = luxi.Client()
246       (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
247     except Exception:
248       raise utils.RetryAgain()
249
250     logging.debug("Received cluster name %s from master", cluster_name)
251
252   try:
253     utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
254   except utils.RetryTimeout:
255     raise errors.OpExecError("Master daemon didn't answer queries within"
256                              " %s seconds" % _DAEMON_READY_TIMEOUT)
257
258
259 def RunNodeSetupCmd(cluster_name, node, basecmd, debug, verbose,
260                     use_cluster_key, ask_key, strict_host_check, data):
261   """Runs a command to configure something on a remote machine.
262
263   @type cluster_name: string
264   @param cluster_name: Cluster name
265   @type node: string
266   @param node: Node name
267   @type basecmd: string
268   @param basecmd: Base command (path on the remote machine)
269   @type debug: bool
270   @param debug: Enable debug output
271   @type verbose: bool
272   @param verbose: Enable verbose output
273   @type use_cluster_key: bool
274   @param use_cluster_key: See L{ssh.SshRunner.BuildCmd}
275   @type ask_key: bool
276   @param ask_key: See L{ssh.SshRunner.BuildCmd}
277   @type strict_host_check: bool
278   @param strict_host_check: See L{ssh.SshRunner.BuildCmd}
279   @param data: JSON-serializable input data for script (passed to stdin)
280
281   """
282   cmd = [basecmd]
283
284   # Pass --debug/--verbose to the external script if set on our invocation
285   if debug:
286     cmd.append("--debug")
287
288   if verbose:
289     cmd.append("--verbose")
290
291   srun = ssh.SshRunner(cluster_name)
292   scmd = srun.BuildCmd(node, constants.SSH_LOGIN_USER,
293                        utils.ShellQuoteArgs(cmd),
294                        batch=False, ask_key=ask_key, quiet=False,
295                        strict_host_check=strict_host_check,
296                        use_cluster_key=use_cluster_key)
297
298   tempfh = tempfile.TemporaryFile()
299   try:
300     tempfh.write(serializer.DumpJson(data))
301     tempfh.seek(0)
302
303     result = utils.RunCmd(scmd, interactive=True, input_fd=tempfh)
304   finally:
305     tempfh.close()
306
307   if result.failed:
308     raise errors.OpExecError("Command '%s' failed: %s" %
309                              (result.cmd, result.fail_reason))
310
311
312 def _InitFileStorage(file_storage_dir):
313   """Initialize if needed the file storage.
314
315   @param file_storage_dir: the user-supplied value
316   @return: either empty string (if file storage was disabled at build
317       time) or the normalized path to the storage directory
318
319   """
320   file_storage_dir = os.path.normpath(file_storage_dir)
321
322   if not os.path.isabs(file_storage_dir):
323     raise errors.OpPrereqError("File storage directory '%s' is not an absolute"
324                                " path" % file_storage_dir, errors.ECODE_INVAL)
325
326   if not os.path.exists(file_storage_dir):
327     try:
328       os.makedirs(file_storage_dir, 0750)
329     except OSError, err:
330       raise errors.OpPrereqError("Cannot create file storage directory"
331                                  " '%s': %s" % (file_storage_dir, err),
332                                  errors.ECODE_ENVIRON)
333
334   if not os.path.isdir(file_storage_dir):
335     raise errors.OpPrereqError("The file storage directory '%s' is not"
336                                " a directory." % file_storage_dir,
337                                errors.ECODE_ENVIRON)
338   return file_storage_dir
339
340
341 def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914
342                 master_netmask, master_netdev, file_storage_dir,
343                 shared_file_storage_dir, candidate_pool_size, secondary_ip=None,
344                 vg_name=None, beparams=None, nicparams=None, ndparams=None,
345                 hvparams=None, diskparams=None, enabled_hypervisors=None,
346                 modify_etc_hosts=True, modify_ssh_setup=True,
347                 maintain_node_health=False, drbd_helper=None, uid_pool=None,
348                 default_iallocator=None, primary_ip_version=None, ipolicy=None,
349                 prealloc_wipe_disks=False, use_external_mip_script=False,
350                 hv_state=None, disk_state=None):
351   """Initialise the cluster.
352
353   @type candidate_pool_size: int
354   @param candidate_pool_size: master candidate pool size
355
356   """
357   # TODO: complete the docstring
358   if config.ConfigWriter.IsCluster():
359     raise errors.OpPrereqError("Cluster is already initialised",
360                                errors.ECODE_STATE)
361
362   if not enabled_hypervisors:
363     raise errors.OpPrereqError("Enabled hypervisors list must contain at"
364                                " least one member", errors.ECODE_INVAL)
365   invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
366   if invalid_hvs:
367     raise errors.OpPrereqError("Enabled hypervisors contains invalid"
368                                " entries: %s" % invalid_hvs,
369                                errors.ECODE_INVAL)
370
371   try:
372     ipcls = netutils.IPAddress.GetClassFromIpVersion(primary_ip_version)
373   except errors.ProgrammerError:
374     raise errors.OpPrereqError("Invalid primary ip version: %d." %
375                                primary_ip_version, errors.ECODE_INVAL)
376
377   hostname = netutils.GetHostname(family=ipcls.family)
378   if not ipcls.IsValid(hostname.ip):
379     raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
380                                " address." % (hostname.ip, primary_ip_version),
381                                errors.ECODE_INVAL)
382
383   if ipcls.IsLoopback(hostname.ip):
384     raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
385                                " address. Please fix DNS or %s." %
386                                (hostname.ip, pathutils.ETC_HOSTS),
387                                errors.ECODE_ENVIRON)
388
389   if not ipcls.Own(hostname.ip):
390     raise errors.OpPrereqError("Inconsistency: this host's name resolves"
391                                " to %s,\nbut this ip address does not"
392                                " belong to this host" %
393                                hostname.ip, errors.ECODE_ENVIRON)
394
395   clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
396
397   if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
398     raise errors.OpPrereqError("Cluster IP already active",
399                                errors.ECODE_NOTUNIQUE)
400
401   if not secondary_ip:
402     if primary_ip_version == constants.IP6_VERSION:
403       raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
404                                  " IPv4 address must be given as secondary",
405                                  errors.ECODE_INVAL)
406     secondary_ip = hostname.ip
407
408   if not netutils.IP4Address.IsValid(secondary_ip):
409     raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
410                                " IPv4 address." % secondary_ip,
411                                errors.ECODE_INVAL)
412
413   if not netutils.IP4Address.Own(secondary_ip):
414     raise errors.OpPrereqError("You gave %s as secondary IP,"
415                                " but it does not belong to this host." %
416                                secondary_ip, errors.ECODE_ENVIRON)
417
418   if master_netmask is not None:
419     if not ipcls.ValidateNetmask(master_netmask):
420       raise errors.OpPrereqError("CIDR netmask (%s) not valid for IPv%s " %
421                                   (master_netmask, primary_ip_version),
422                                  errors.ECODE_INVAL)
423   else:
424     master_netmask = ipcls.iplen
425
426   if vg_name is not None:
427     # Check if volume group is valid
428     vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
429                                           constants.MIN_VG_SIZE)
430     if vgstatus:
431       raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
432                                  " you are not using lvm" % vgstatus,
433                                  errors.ECODE_INVAL)
434
435   if drbd_helper is not None:
436     try:
437       curr_helper = bdev.BaseDRBD.GetUsermodeHelper()
438     except errors.BlockDeviceError, err:
439       raise errors.OpPrereqError("Error while checking drbd helper"
440                                  " (specify --no-drbd-storage if you are not"
441                                  " using drbd): %s" % str(err),
442                                  errors.ECODE_ENVIRON)
443     if drbd_helper != curr_helper:
444       raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s"
445                                  " is the current helper" % (drbd_helper,
446                                                              curr_helper),
447                                  errors.ECODE_INVAL)
448
449   if constants.ENABLE_FILE_STORAGE:
450     file_storage_dir = _InitFileStorage(file_storage_dir)
451   else:
452     file_storage_dir = ""
453
454   if constants.ENABLE_SHARED_FILE_STORAGE:
455     shared_file_storage_dir = _InitFileStorage(shared_file_storage_dir)
456   else:
457     shared_file_storage_dir = ""
458
459   if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
460     raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
461                                errors.ECODE_INVAL)
462
463   result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
464   if result.failed:
465     raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
466                                (master_netdev,
467                                 result.output.strip()), errors.ECODE_INVAL)
468
469   dirs = [(pathutils.RUN_DIR, constants.RUN_DIRS_MODE)]
470   utils.EnsureDirs(dirs)
471
472   objects.UpgradeBeParams(beparams)
473   utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
474   utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
475
476   objects.NIC.CheckParameterSyntax(nicparams)
477
478   full_ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy)
479
480   if ndparams is not None:
481     utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
482   else:
483     ndparams = dict(constants.NDC_DEFAULTS)
484
485   # This is ugly, as we modify the dict itself
486   # FIXME: Make utils.ForceDictType pure functional or write a wrapper
487   # around it
488   if hv_state:
489     for hvname, hvs_data in hv_state.items():
490       utils.ForceDictType(hvs_data, constants.HVSTS_PARAMETER_TYPES)
491       hv_state[hvname] = objects.Cluster.SimpleFillHvState(hvs_data)
492   else:
493     hv_state = dict((hvname, constants.HVST_DEFAULTS)
494                     for hvname in enabled_hypervisors)
495
496   # FIXME: disk_state has no default values yet
497   if disk_state:
498     for storage, ds_data in disk_state.items():
499       if storage not in constants.DS_VALID_TYPES:
500         raise errors.OpPrereqError("Invalid storage type in disk state: %s" %
501                                    storage, errors.ECODE_INVAL)
502       for ds_name, state in ds_data.items():
503         utils.ForceDictType(state, constants.DSS_PARAMETER_TYPES)
504         ds_data[ds_name] = objects.Cluster.SimpleFillDiskState(state)
505
506   # hvparams is a mapping of hypervisor->hvparams dict
507   for hv_name, hv_params in hvparams.iteritems():
508     utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
509     hv_class = hypervisor.GetHypervisor(hv_name)
510     hv_class.CheckParameterSyntax(hv_params)
511
512   # diskparams is a mapping of disk-template->diskparams dict
513   for template, dt_params in diskparams.items():
514     param_keys = set(dt_params.keys())
515     default_param_keys = set(constants.DISK_DT_DEFAULTS[template].keys())
516     if not (param_keys <= default_param_keys):
517       unknown_params = param_keys - default_param_keys
518       raise errors.OpPrereqError("Invalid parameters for disk template %s:"
519                                  " %s" % (template,
520                                           utils.CommaJoin(unknown_params)),
521                                  errors.ECODE_INVAL)
522     utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
523   try:
524     utils.VerifyDictOptions(diskparams, constants.DISK_DT_DEFAULTS)
525   except errors.OpPrereqError, err:
526     raise errors.OpPrereqError("While verify diskparam options: %s" % err,
527                                errors.ECODE_INVAL)
528
529   # set up ssh config and /etc/hosts
530   sshline = utils.ReadFile(pathutils.SSH_HOST_RSA_PUB)
531   sshkey = sshline.split(" ")[1]
532
533   if modify_etc_hosts:
534     utils.AddHostToEtcHosts(hostname.name, hostname.ip)
535
536   if modify_ssh_setup:
537     _InitSSHSetup()
538
539   if default_iallocator is not None:
540     alloc_script = utils.FindFile(default_iallocator,
541                                   constants.IALLOCATOR_SEARCH_PATH,
542                                   os.path.isfile)
543     if alloc_script is None:
544       raise errors.OpPrereqError("Invalid default iallocator script '%s'"
545                                  " specified" % default_iallocator,
546                                  errors.ECODE_INVAL)
547   elif constants.HTOOLS:
548     # htools was enabled at build-time, we default to it
549     if utils.FindFile(constants.IALLOC_HAIL,
550                       constants.IALLOCATOR_SEARCH_PATH,
551                       os.path.isfile):
552       default_iallocator = constants.IALLOC_HAIL
553
554   now = time.time()
555
556   # init of cluster config file
557   cluster_config = objects.Cluster(
558     serial_no=1,
559     rsahostkeypub=sshkey,
560     highest_used_port=(constants.FIRST_DRBD_PORT - 1),
561     mac_prefix=mac_prefix,
562     volume_group_name=vg_name,
563     tcpudp_port_pool=set(),
564     master_node=hostname.name,
565     master_ip=clustername.ip,
566     master_netmask=master_netmask,
567     master_netdev=master_netdev,
568     cluster_name=clustername.name,
569     file_storage_dir=file_storage_dir,
570     shared_file_storage_dir=shared_file_storage_dir,
571     enabled_hypervisors=enabled_hypervisors,
572     beparams={constants.PP_DEFAULT: beparams},
573     nicparams={constants.PP_DEFAULT: nicparams},
574     ndparams=ndparams,
575     hvparams=hvparams,
576     diskparams=diskparams,
577     candidate_pool_size=candidate_pool_size,
578     modify_etc_hosts=modify_etc_hosts,
579     modify_ssh_setup=modify_ssh_setup,
580     uid_pool=uid_pool,
581     ctime=now,
582     mtime=now,
583     maintain_node_health=maintain_node_health,
584     drbd_usermode_helper=drbd_helper,
585     default_iallocator=default_iallocator,
586     primary_ip_family=ipcls.family,
587     prealloc_wipe_disks=prealloc_wipe_disks,
588     use_external_mip_script=use_external_mip_script,
589     ipolicy=full_ipolicy,
590     hv_state_static=hv_state,
591     disk_state_static=disk_state,
592     )
593   master_node_config = objects.Node(name=hostname.name,
594                                     primary_ip=hostname.ip,
595                                     secondary_ip=secondary_ip,
596                                     serial_no=1,
597                                     master_candidate=True,
598                                     offline=False, drained=False,
599                                     ctime=now, mtime=now,
600                                     )
601   InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
602   cfg = config.ConfigWriter(offline=True)
603   ssh.WriteKnownHostsFile(cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
604   cfg.Update(cfg.GetClusterInfo(), logging.error)
605   ssconf.WriteSsconfFiles(cfg.GetSsconfValues())
606
607   # set up the inter-node password and certificate
608   _InitGanetiServerSetup(hostname.name)
609
610   logging.debug("Starting daemons")
611   result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
612   if result.failed:
613     raise errors.OpExecError("Could not start daemons, command %s"
614                              " had exitcode %s and error %s" %
615                              (result.cmd, result.exit_code, result.output))
616
617   _WaitForMasterDaemon()
618
619
620 def InitConfig(version, cluster_config, master_node_config,
621                cfg_file=pathutils.CLUSTER_CONF_FILE):
622   """Create the initial cluster configuration.
623
624   It will contain the current node, which will also be the master
625   node, and no instances.
626
627   @type version: int
628   @param version: configuration version
629   @type cluster_config: L{objects.Cluster}
630   @param cluster_config: cluster configuration
631   @type master_node_config: L{objects.Node}
632   @param master_node_config: master node configuration
633   @type cfg_file: string
634   @param cfg_file: configuration file path
635
636   """
637   uuid_generator = config.TemporaryReservationManager()
638   cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
639                                                 _INITCONF_ECID)
640   master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
641                                                     _INITCONF_ECID)
642   nodes = {
643     master_node_config.name: master_node_config,
644     }
645   default_nodegroup = objects.NodeGroup(
646     uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
647     name=constants.INITIAL_NODE_GROUP_NAME,
648     members=[master_node_config.name],
649     diskparams={},
650     )
651   nodegroups = {
652     default_nodegroup.uuid: default_nodegroup,
653     }
654   now = time.time()
655   config_data = objects.ConfigData(version=version,
656                                    cluster=cluster_config,
657                                    nodegroups=nodegroups,
658                                    nodes=nodes,
659                                    instances={},
660                                    networks={},
661                                    serial_no=1,
662                                    ctime=now, mtime=now)
663   utils.WriteFile(cfg_file,
664                   data=serializer.Dump(config_data.ToDict()),
665                   mode=0600)
666
667
668 def FinalizeClusterDestroy(master):
669   """Execute the last steps of cluster destroy
670
671   This function shuts down all the daemons, completing the destroy
672   begun in cmdlib.LUDestroyOpcode.
673
674   """
675   cfg = config.ConfigWriter()
676   modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
677   runner = rpc.BootstrapRunner()
678
679   master_params = cfg.GetMasterNetworkParameters()
680   master_params.name = master
681   ems = cfg.GetUseExternalMipScript()
682   result = runner.call_node_deactivate_master_ip(master_params.name,
683                                                  master_params, ems)
684
685   msg = result.fail_msg
686   if msg:
687     logging.warning("Could not disable the master IP: %s", msg)
688
689   result = runner.call_node_stop_master(master)
690   msg = result.fail_msg
691   if msg:
692     logging.warning("Could not disable the master role: %s", msg)
693
694   result = runner.call_node_leave_cluster(master, modify_ssh_setup)
695   msg = result.fail_msg
696   if msg:
697     logging.warning("Could not shutdown the node daemon and cleanup"
698                     " the node: %s", msg)
699
700
701 def SetupNodeDaemon(cluster_name, node, ssh_key_check):
702   """Add a node to the cluster.
703
704   This function must be called before the actual opcode, and will ssh
705   to the remote node, copy the needed files, and start ganeti-noded,
706   allowing the master to do the rest via normal rpc calls.
707
708   @param cluster_name: the cluster name
709   @param node: the name of the new node
710   @param ssh_key_check: whether to do a strict key check
711
712   """
713   sstore = ssconf.SimpleStore()
714   family = sstore.GetPrimaryIPFamily()
715   sshrunner = ssh.SshRunner(cluster_name,
716                             ipv6=(family == netutils.IP6Address.family))
717
718   # set up inter-node password and certificate and restarts the node daemon
719   # and then connect with ssh to set password and start ganeti-noded
720   # note that all the below variables are sanitized at this point,
721   # either by being constants or by the checks above
722   sshrunner.CopyFileToNode(node, pathutils.NODED_CERT_FILE)
723   sshrunner.CopyFileToNode(node, pathutils.RAPI_CERT_FILE)
724   sshrunner.CopyFileToNode(node, pathutils.SPICE_CERT_FILE)
725   sshrunner.CopyFileToNode(node, pathutils.SPICE_CACERT_FILE)
726   sshrunner.CopyFileToNode(node, pathutils.CONFD_HMAC_KEY)
727   for filename in sstore.GetFileList():
728     sshrunner.CopyFileToNode(node, filename)
729   mycommand = ("%s stop-all; %s start %s" %
730                (pathutils.DAEMON_UTIL, pathutils.DAEMON_UTIL, constants.NODED))
731
732   result = sshrunner.Run(node, constants.SSH_LOGIN_USER, mycommand, batch=False,
733                          ask_key=ssh_key_check,
734                          use_cluster_key=True,
735                          strict_host_check=ssh_key_check)
736   if result.failed:
737     raise errors.OpExecError("Remote command on node %s, error: %s,"
738                              " output: %s" %
739                              (node, result.fail_reason, result.output))
740
741   _WaitForNodeDaemon(node)
742
743
744 def MasterFailover(no_voting=False):
745   """Failover the master node.
746
747   This checks that we are not already the master, and will cause the
748   current master to cease being master, and the non-master to become
749   new master.
750
751   @type no_voting: boolean
752   @param no_voting: force the operation without remote nodes agreement
753                       (dangerous)
754
755   """
756   sstore = ssconf.SimpleStore()
757
758   old_master, new_master = ssconf.GetMasterAndMyself(sstore)
759   node_list = sstore.GetNodeList()
760   mc_list = sstore.GetMasterCandidates()
761
762   if old_master == new_master:
763     raise errors.OpPrereqError("This commands must be run on the node"
764                                " where you want the new master to be."
765                                " %s is already the master" %
766                                old_master, errors.ECODE_INVAL)
767
768   if new_master not in mc_list:
769     mc_no_master = [name for name in mc_list if name != old_master]
770     raise errors.OpPrereqError("This node is not among the nodes marked"
771                                " as master candidates. Only these nodes"
772                                " can become masters. Current list of"
773                                " master candidates is:\n"
774                                "%s" % ("\n".join(mc_no_master)),
775                                errors.ECODE_STATE)
776
777   if not no_voting:
778     vote_list = GatherMasterVotes(node_list)
779
780     if vote_list:
781       voted_master = vote_list[0][0]
782       if voted_master is None:
783         raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
784                                    " not respond.", errors.ECODE_ENVIRON)
785       elif voted_master != old_master:
786         raise errors.OpPrereqError("I have a wrong configuration, I believe"
787                                    " the master is %s but the other nodes"
788                                    " voted %s. Please resync the configuration"
789                                    " of this node." %
790                                    (old_master, voted_master),
791                                    errors.ECODE_STATE)
792   # end checks
793
794   rcode = 0
795
796   logging.info("Setting master to %s, old master: %s", new_master, old_master)
797
798   try:
799     # instantiate a real config writer, as we now know we have the
800     # configuration data
801     cfg = config.ConfigWriter(accept_foreign=True)
802
803     cluster_info = cfg.GetClusterInfo()
804     cluster_info.master_node = new_master
805     # this will also regenerate the ssconf files, since we updated the
806     # cluster info
807     cfg.Update(cluster_info, logging.error)
808   except errors.ConfigurationError, err:
809     logging.error("Error while trying to set the new master: %s",
810                   str(err))
811     return 1
812
813   # if cfg.Update worked, then it means the old master daemon won't be
814   # able now to write its own config file (we rely on locking in both
815   # backend.UploadFile() and ConfigWriter._Write(); hence the next
816   # step is to kill the old master
817
818   logging.info("Stopping the master daemon on node %s", old_master)
819
820   runner = rpc.BootstrapRunner()
821   master_params = cfg.GetMasterNetworkParameters()
822   master_params.name = old_master
823   ems = cfg.GetUseExternalMipScript()
824   result = runner.call_node_deactivate_master_ip(master_params.name,
825                                                  master_params, ems)
826
827   msg = result.fail_msg
828   if msg:
829     logging.warning("Could not disable the master IP: %s", msg)
830
831   result = runner.call_node_stop_master(old_master)
832   msg = result.fail_msg
833   if msg:
834     logging.error("Could not disable the master role on the old master"
835                   " %s, please disable manually: %s", old_master, msg)
836
837   logging.info("Checking master IP non-reachability...")
838
839   master_ip = sstore.GetMasterIP()
840   total_timeout = 30
841
842   # Here we have a phase where no master should be running
843   def _check_ip():
844     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
845       raise utils.RetryAgain()
846
847   try:
848     utils.Retry(_check_ip, (1, 1.5, 5), total_timeout)
849   except utils.RetryTimeout:
850     logging.warning("The master IP is still reachable after %s seconds,"
851                     " continuing but activating the master on the current"
852                     " node will probably fail", total_timeout)
853
854   if jstore.CheckDrainFlag():
855     logging.info("Undraining job queue")
856     jstore.SetDrainFlag(False)
857
858   logging.info("Starting the master daemons on the new master")
859
860   result = rpc.BootstrapRunner().call_node_start_master_daemons(new_master,
861                                                                 no_voting)
862   msg = result.fail_msg
863   if msg:
864     logging.error("Could not start the master role on the new master"
865                   " %s, please check: %s", new_master, msg)
866     rcode = 1
867
868   logging.info("Master failed over from %s to %s", old_master, new_master)
869   return rcode
870
871
872 def GetMaster():
873   """Returns the current master node.
874
875   This is a separate function in bootstrap since it's needed by
876   gnt-cluster, and instead of importing directly ssconf, it's better
877   to abstract it in bootstrap, where we do use ssconf in other
878   functions too.
879
880   """
881   sstore = ssconf.SimpleStore()
882
883   old_master, _ = ssconf.GetMasterAndMyself(sstore)
884
885   return old_master
886
887
888 def GatherMasterVotes(node_list):
889   """Check the agreement on who is the master.
890
891   This function will return a list of (node, number of votes), ordered
892   by the number of votes. Errors will be denoted by the key 'None'.
893
894   Note that the sum of votes is the number of nodes this machine
895   knows, whereas the number of entries in the list could be different
896   (if some nodes vote for another master).
897
898   We remove ourselves from the list since we know that (bugs aside)
899   since we use the same source for configuration information for both
900   backend and boostrap, we'll always vote for ourselves.
901
902   @type node_list: list
903   @param node_list: the list of nodes to query for master info; the current
904       node will be removed if it is in the list
905   @rtype: list
906   @return: list of (node, votes)
907
908   """
909   myself = netutils.Hostname.GetSysName()
910   try:
911     node_list.remove(myself)
912   except ValueError:
913     pass
914   if not node_list:
915     # no nodes left (eventually after removing myself)
916     return []
917   results = rpc.BootstrapRunner().call_master_info(node_list)
918   if not isinstance(results, dict):
919     # this should not happen (unless internal error in rpc)
920     logging.critical("Can't complete rpc call, aborting master startup")
921     return [(None, len(node_list))]
922   votes = {}
923   for node in results:
924     nres = results[node]
925     data = nres.payload
926     msg = nres.fail_msg
927     fail = False
928     if msg:
929       logging.warning("Error contacting node %s: %s", node, msg)
930       fail = True
931     # for now we accept both length 3, 4 and 5 (data[3] is primary ip version
932     # and data[4] is the master netmask)
933     elif not isinstance(data, (tuple, list)) or len(data) < 3:
934       logging.warning("Invalid data received from node %s: %s", node, data)
935       fail = True
936     if fail:
937       if None not in votes:
938         votes[None] = 0
939       votes[None] += 1
940       continue
941     master_node = data[2]
942     if master_node not in votes:
943       votes[master_node] = 0
944     votes[master_node] += 1
945
946   vote_list = [v for v in votes.items()]
947   # sort first on number of votes then on name, since we want None
948   # sorted later if we have the half of the nodes not responding, and
949   # half voting all for the same master
950   vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
951
952   return vote_list