c72754e75fdf0456bb2d20da23e73881d880fefe
[ganeti-local] / lib / bootstrap.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions to bootstrap a new cluster.
23
24 """
25
26 import os
27 import os.path
28 import re
29 import logging
30 import time
31
32 from ganeti import rpc
33 from ganeti import ssh
34 from ganeti import utils
35 from ganeti import errors
36 from ganeti import config
37 from ganeti import constants
38 from ganeti import objects
39 from ganeti import ssconf
40 from ganeti import serializer
41 from ganeti import hypervisor
42 from ganeti import bdev
43 from ganeti import netutils
44 from ganeti import backend
45 from ganeti import luxi
46 from ganeti import jstore
47
48
49 # ec_id for InitConfig's temporary reservation manager
50 _INITCONF_ECID = "initconfig-ecid"
51
52 #: After how many seconds daemon must be responsive
53 _DAEMON_READY_TIMEOUT = 10.0
54
55
56 def _InitSSHSetup():
57   """Setup the SSH configuration for the cluster.
58
59   This generates a dsa keypair for root, adds the pub key to the
60   permitted hosts and adds the hostkey to its own known hosts.
61
62   """
63   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
64
65   for name in priv_key, pub_key:
66     if os.path.exists(name):
67       utils.CreateBackup(name)
68     utils.RemoveFile(name)
69
70   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
71                          "-f", priv_key,
72                          "-q", "-N", ""])
73   if result.failed:
74     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
75                              result.output)
76
77   utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
78
79
80 def GenerateHmacKey(file_name):
81   """Writes a new HMAC key.
82
83   @type file_name: str
84   @param file_name: Path to output file
85
86   """
87   utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
88                   backup=True)
89
90
91 def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_confd_hmac_key,
92                           new_cds, rapi_cert_pem=None, cds=None,
93                           nodecert_file=constants.NODED_CERT_FILE,
94                           rapicert_file=constants.RAPI_CERT_FILE,
95                           hmackey_file=constants.CONFD_HMAC_KEY,
96                           cds_file=constants.CLUSTER_DOMAIN_SECRET_FILE):
97   """Updates the cluster certificates, keys and secrets.
98
99   @type new_cluster_cert: bool
100   @param new_cluster_cert: Whether to generate a new cluster certificate
101   @type new_rapi_cert: bool
102   @param new_rapi_cert: Whether to generate a new RAPI certificate
103   @type new_confd_hmac_key: bool
104   @param new_confd_hmac_key: Whether to generate a new HMAC key
105   @type new_cds: bool
106   @param new_cds: Whether to generate a new cluster domain secret
107   @type rapi_cert_pem: string
108   @param rapi_cert_pem: New RAPI certificate in PEM format
109   @type cds: string
110   @param cds: New cluster domain secret
111   @type nodecert_file: string
112   @param nodecert_file: optional override of the node cert file path
113   @type rapicert_file: string
114   @param rapicert_file: optional override of the rapi cert file path
115   @type hmackey_file: string
116   @param hmackey_file: optional override of the hmac key file path
117
118   """
119   # noded SSL certificate
120   cluster_cert_exists = os.path.exists(nodecert_file)
121   if new_cluster_cert or not cluster_cert_exists:
122     if cluster_cert_exists:
123       utils.CreateBackup(nodecert_file)
124
125     logging.debug("Generating new cluster certificate at %s", nodecert_file)
126     utils.GenerateSelfSignedSslCert(nodecert_file)
127
128   # confd HMAC key
129   if new_confd_hmac_key or not os.path.exists(hmackey_file):
130     logging.debug("Writing new confd HMAC key to %s", hmackey_file)
131     GenerateHmacKey(hmackey_file)
132
133   # RAPI
134   rapi_cert_exists = os.path.exists(rapicert_file)
135
136   if rapi_cert_pem:
137     # Assume rapi_pem contains a valid PEM-formatted certificate and key
138     logging.debug("Writing RAPI certificate at %s", rapicert_file)
139     utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True)
140
141   elif new_rapi_cert or not rapi_cert_exists:
142     if rapi_cert_exists:
143       utils.CreateBackup(rapicert_file)
144
145     logging.debug("Generating new RAPI certificate at %s", rapicert_file)
146     utils.GenerateSelfSignedSslCert(rapicert_file)
147
148   # Cluster domain secret
149   if cds:
150     logging.debug("Writing cluster domain secret to %s", cds_file)
151     utils.WriteFile(cds_file, data=cds, backup=True)
152
153   elif new_cds or not os.path.exists(cds_file):
154     logging.debug("Generating new cluster domain secret at %s", cds_file)
155     GenerateHmacKey(cds_file)
156
157
158 def _InitGanetiServerSetup(master_name):
159   """Setup the necessary configuration for the initial node daemon.
160
161   This creates the nodepass file containing the shared password for
162   the cluster, generates the SSL certificate and starts the node daemon.
163
164   @type master_name: str
165   @param master_name: Name of the master node
166
167   """
168   # Generate cluster secrets
169   GenerateClusterCrypto(True, False, False, False)
170
171   result = utils.RunCmd([constants.DAEMON_UTIL, "start", constants.NODED])
172   if result.failed:
173     raise errors.OpExecError("Could not start the node daemon, command %s"
174                              " had exitcode %s and error %s" %
175                              (result.cmd, result.exit_code, result.output))
176
177   _WaitForNodeDaemon(master_name)
178
179
180 def _WaitForNodeDaemon(node_name):
181   """Wait for node daemon to become responsive.
182
183   """
184   def _CheckNodeDaemon():
185     result = rpc.RpcRunner.call_version([node_name])[node_name]
186     if result.fail_msg:
187       raise utils.RetryAgain()
188
189   try:
190     utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
191   except utils.RetryTimeout:
192     raise errors.OpExecError("Node daemon on %s didn't answer queries within"
193                              " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
194
195
196 def _WaitForMasterDaemon():
197   """Wait for master daemon to become responsive.
198
199   """
200   def _CheckMasterDaemon():
201     try:
202       cl = luxi.Client()
203       (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
204     except Exception:
205       raise utils.RetryAgain()
206
207     logging.debug("Received cluster name %s from master", cluster_name)
208
209   try:
210     utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
211   except utils.RetryTimeout:
212     raise errors.OpExecError("Master daemon didn't answer queries within"
213                              " %s seconds" % _DAEMON_READY_TIMEOUT)
214
215
216 def _InitFileStorage(file_storage_dir):
217   """Initialize if needed the file storage.
218
219   @param file_storage_dir: the user-supplied value
220   @return: either empty string (if file storage was disabled at build
221       time) or the normalized path to the storage directory
222
223   """
224   if not constants.ENABLE_FILE_STORAGE:
225     return ""
226
227   file_storage_dir = os.path.normpath(file_storage_dir)
228
229   if not os.path.isabs(file_storage_dir):
230     raise errors.OpPrereqError("The file storage directory you passed is"
231                                " not an absolute path.", errors.ECODE_INVAL)
232
233   if not os.path.exists(file_storage_dir):
234     try:
235       os.makedirs(file_storage_dir, 0750)
236     except OSError, err:
237       raise errors.OpPrereqError("Cannot create file storage directory"
238                                  " '%s': %s" % (file_storage_dir, err),
239                                  errors.ECODE_ENVIRON)
240
241   if not os.path.isdir(file_storage_dir):
242     raise errors.OpPrereqError("The file storage directory '%s' is not"
243                                " a directory." % file_storage_dir,
244                                errors.ECODE_ENVIRON)
245   return file_storage_dir
246
247
248 def _InitSharedFileStorage(shared_file_storage_dir):
249   """Initialize if needed the shared file storage.
250
251   @param shared_file_storage_dir: the user-supplied value
252   @return: either empty string (if file storage was disabled at build
253       time) or the normalized path to the storage directory
254
255   """
256   if not constants.ENABLE_SHARED_FILE_STORAGE:
257     return ""
258
259   shared_file_storage_dir = os.path.normpath(shared_file_storage_dir)
260
261   if not os.path.isabs(shared_file_storage_dir):
262     raise errors.OpPrereqError("The shared file storage directory you"
263                                " passed is not an absolute path.",
264                                errors.ECODE_INVAL)
265
266   if not os.path.exists(shared_file_storage_dir):
267     try:
268       os.makedirs(shared_file_storage_dir, 0750)
269     except OSError, err:
270       raise errors.OpPrereqError("Cannot create file storage directory"
271                                  " '%s': %s" % (shared_file_storage_dir, err),
272                                  errors.ECODE_ENVIRON)
273
274   if not os.path.isdir(shared_file_storage_dir):
275     raise errors.OpPrereqError("The file storage directory '%s' is not"
276                                " a directory." % shared_file_storage_dir,
277                                errors.ECODE_ENVIRON)
278   return shared_file_storage_dir
279
280
281 def InitCluster(cluster_name, mac_prefix, # pylint: disable-msg=R0913
282                 master_netdev, file_storage_dir, shared_file_storage_dir,
283                 candidate_pool_size, secondary_ip=None, vg_name=None,
284                 beparams=None, nicparams=None, ndparams=None, hvparams=None,
285                 enabled_hypervisors=None, modify_etc_hosts=True,
286                 modify_ssh_setup=True, maintain_node_health=False,
287                 drbd_helper=None, uid_pool=None, default_iallocator=None,
288                 primary_ip_version=None, prealloc_wipe_disks=False):
289   """Initialise the cluster.
290
291   @type candidate_pool_size: int
292   @param candidate_pool_size: master candidate pool size
293
294   """
295   # TODO: complete the docstring
296   if config.ConfigWriter.IsCluster():
297     raise errors.OpPrereqError("Cluster is already initialised",
298                                errors.ECODE_STATE)
299
300   if not enabled_hypervisors:
301     raise errors.OpPrereqError("Enabled hypervisors list must contain at"
302                                " least one member", errors.ECODE_INVAL)
303   invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
304   if invalid_hvs:
305     raise errors.OpPrereqError("Enabled hypervisors contains invalid"
306                                " entries: %s" % invalid_hvs,
307                                errors.ECODE_INVAL)
308
309
310   ipcls = None
311   if primary_ip_version == constants.IP4_VERSION:
312     ipcls = netutils.IP4Address
313   elif primary_ip_version == constants.IP6_VERSION:
314     ipcls = netutils.IP6Address
315   else:
316     raise errors.OpPrereqError("Invalid primary ip version: %d." %
317                                primary_ip_version)
318
319   hostname = netutils.GetHostname(family=ipcls.family)
320   if not ipcls.IsValid(hostname.ip):
321     raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
322                                " address." % (hostname.ip, primary_ip_version))
323
324   if ipcls.IsLoopback(hostname.ip):
325     raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
326                                " address. Please fix DNS or %s." %
327                                (hostname.ip, constants.ETC_HOSTS),
328                                errors.ECODE_ENVIRON)
329
330   if not ipcls.Own(hostname.ip):
331     raise errors.OpPrereqError("Inconsistency: this host's name resolves"
332                                " to %s,\nbut this ip address does not"
333                                " belong to this host" %
334                                hostname.ip, errors.ECODE_ENVIRON)
335
336   clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
337
338   if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
339     raise errors.OpPrereqError("Cluster IP already active",
340                                errors.ECODE_NOTUNIQUE)
341
342   if not secondary_ip:
343     if primary_ip_version == constants.IP6_VERSION:
344       raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
345                                  " IPv4 address must be given as secondary",
346                                  errors.ECODE_INVAL)
347     secondary_ip = hostname.ip
348
349   if not netutils.IP4Address.IsValid(secondary_ip):
350     raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
351                                " IPv4 address." % secondary_ip,
352                                errors.ECODE_INVAL)
353
354   if not netutils.IP4Address.Own(secondary_ip):
355     raise errors.OpPrereqError("You gave %s as secondary IP,"
356                                " but it does not belong to this host." %
357                                secondary_ip, errors.ECODE_ENVIRON)
358
359   if vg_name is not None:
360     # Check if volume group is valid
361     vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
362                                           constants.MIN_VG_SIZE)
363     if vgstatus:
364       raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
365                                  " you are not using lvm" % vgstatus,
366                                  errors.ECODE_INVAL)
367
368   if drbd_helper is not None:
369     try:
370       curr_helper = bdev.BaseDRBD.GetUsermodeHelper()
371     except errors.BlockDeviceError, err:
372       raise errors.OpPrereqError("Error while checking drbd helper"
373                                  " (specify --no-drbd-storage if you are not"
374                                  " using drbd): %s" % str(err),
375                                  errors.ECODE_ENVIRON)
376     if drbd_helper != curr_helper:
377       raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s"
378                                  " is the current helper" % (drbd_helper,
379                                                              curr_helper),
380                                  errors.ECODE_INVAL)
381
382   file_storage_dir = _InitFileStorage(file_storage_dir)
383   shared_file_storage_dir = _InitSharedFileStorage(shared_file_storage_dir)
384
385   if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
386     raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
387                                errors.ECODE_INVAL)
388
389   result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
390   if result.failed:
391     raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
392                                (master_netdev,
393                                 result.output.strip()), errors.ECODE_INVAL)
394
395   dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE)]
396   utils.EnsureDirs(dirs)
397
398   utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
399   utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
400   objects.NIC.CheckParameterSyntax(nicparams)
401
402   if ndparams is not None:
403     utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
404   else:
405     ndparams = dict(constants.NDC_DEFAULTS)
406
407   # hvparams is a mapping of hypervisor->hvparams dict
408   for hv_name, hv_params in hvparams.iteritems():
409     utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
410     hv_class = hypervisor.GetHypervisor(hv_name)
411     hv_class.CheckParameterSyntax(hv_params)
412
413   # set up ssh config and /etc/hosts
414   sshline = utils.ReadFile(constants.SSH_HOST_RSA_PUB)
415   sshkey = sshline.split(" ")[1]
416
417   if modify_etc_hosts:
418     utils.AddHostToEtcHosts(hostname.name, hostname.ip)
419
420   if modify_ssh_setup:
421     _InitSSHSetup()
422
423   if default_iallocator is not None:
424     alloc_script = utils.FindFile(default_iallocator,
425                                   constants.IALLOCATOR_SEARCH_PATH,
426                                   os.path.isfile)
427     if alloc_script is None:
428       raise errors.OpPrereqError("Invalid default iallocator script '%s'"
429                                  " specified" % default_iallocator,
430                                  errors.ECODE_INVAL)
431   elif constants.HTOOLS:
432     # htools was enabled at build-time, we default to it
433     if utils.FindFile(constants.IALLOC_HAIL,
434                       constants.IALLOCATOR_SEARCH_PATH,
435                       os.path.isfile):
436       default_iallocator = constants.IALLOC_HAIL
437
438   now = time.time()
439
440   # init of cluster config file
441   cluster_config = objects.Cluster(
442     serial_no=1,
443     rsahostkeypub=sshkey,
444     highest_used_port=(constants.FIRST_DRBD_PORT - 1),
445     mac_prefix=mac_prefix,
446     volume_group_name=vg_name,
447     tcpudp_port_pool=set(),
448     master_node=hostname.name,
449     master_ip=clustername.ip,
450     master_netdev=master_netdev,
451     cluster_name=clustername.name,
452     file_storage_dir=file_storage_dir,
453     shared_file_storage_dir=shared_file_storage_dir,
454     enabled_hypervisors=enabled_hypervisors,
455     beparams={constants.PP_DEFAULT: beparams},
456     nicparams={constants.PP_DEFAULT: nicparams},
457     ndparams=ndparams,
458     hvparams=hvparams,
459     candidate_pool_size=candidate_pool_size,
460     modify_etc_hosts=modify_etc_hosts,
461     modify_ssh_setup=modify_ssh_setup,
462     uid_pool=uid_pool,
463     ctime=now,
464     mtime=now,
465     maintain_node_health=maintain_node_health,
466     drbd_usermode_helper=drbd_helper,
467     default_iallocator=default_iallocator,
468     primary_ip_family=ipcls.family,
469     prealloc_wipe_disks=prealloc_wipe_disks,
470     )
471   master_node_config = objects.Node(name=hostname.name,
472                                     primary_ip=hostname.ip,
473                                     secondary_ip=secondary_ip,
474                                     serial_no=1,
475                                     master_candidate=True,
476                                     offline=False, drained=False,
477                                     ctime=now, mtime=now,
478                                     )
479   InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
480   cfg = config.ConfigWriter(offline=True)
481   ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
482   cfg.Update(cfg.GetClusterInfo(), logging.error)
483   backend.WriteSsconfFiles(cfg.GetSsconfValues())
484
485   # set up the inter-node password and certificate
486   _InitGanetiServerSetup(hostname.name)
487
488   logging.debug("Starting daemons")
489   result = utils.RunCmd([constants.DAEMON_UTIL, "start-all"])
490   if result.failed:
491     raise errors.OpExecError("Could not start daemons, command %s"
492                              " had exitcode %s and error %s" %
493                              (result.cmd, result.exit_code, result.output))
494
495   _WaitForMasterDaemon()
496
497
498 def InitConfig(version, cluster_config, master_node_config,
499                cfg_file=constants.CLUSTER_CONF_FILE):
500   """Create the initial cluster configuration.
501
502   It will contain the current node, which will also be the master
503   node, and no instances.
504
505   @type version: int
506   @param version: configuration version
507   @type cluster_config: L{objects.Cluster}
508   @param cluster_config: cluster configuration
509   @type master_node_config: L{objects.Node}
510   @param master_node_config: master node configuration
511   @type cfg_file: string
512   @param cfg_file: configuration file path
513
514   """
515   uuid_generator = config.TemporaryReservationManager()
516   cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
517                                                 _INITCONF_ECID)
518   master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
519                                                     _INITCONF_ECID)
520   nodes = {
521     master_node_config.name: master_node_config,
522     }
523   default_nodegroup = objects.NodeGroup(
524     uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
525     name=constants.INITIAL_NODE_GROUP_NAME,
526     members=[master_node_config.name],
527     )
528   nodegroups = {
529     default_nodegroup.uuid: default_nodegroup,
530     }
531   now = time.time()
532   config_data = objects.ConfigData(version=version,
533                                    cluster=cluster_config,
534                                    nodegroups=nodegroups,
535                                    nodes=nodes,
536                                    instances={},
537                                    serial_no=1,
538                                    ctime=now, mtime=now)
539   utils.WriteFile(cfg_file,
540                   data=serializer.Dump(config_data.ToDict()),
541                   mode=0600)
542
543
544 def FinalizeClusterDestroy(master):
545   """Execute the last steps of cluster destroy
546
547   This function shuts down all the daemons, completing the destroy
548   begun in cmdlib.LUDestroyOpcode.
549
550   """
551   cfg = config.ConfigWriter()
552   modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
553   result = rpc.RpcRunner.call_node_stop_master(master, True)
554   msg = result.fail_msg
555   if msg:
556     logging.warning("Could not disable the master role: %s", msg)
557   result = rpc.RpcRunner.call_node_leave_cluster(master, modify_ssh_setup)
558   msg = result.fail_msg
559   if msg:
560     logging.warning("Could not shutdown the node daemon and cleanup"
561                     " the node: %s", msg)
562
563
564 def SetupNodeDaemon(cluster_name, node, ssh_key_check):
565   """Add a node to the cluster.
566
567   This function must be called before the actual opcode, and will ssh
568   to the remote node, copy the needed files, and start ganeti-noded,
569   allowing the master to do the rest via normal rpc calls.
570
571   @param cluster_name: the cluster name
572   @param node: the name of the new node
573   @param ssh_key_check: whether to do a strict key check
574
575   """
576   family = ssconf.SimpleStore().GetPrimaryIPFamily()
577   sshrunner = ssh.SshRunner(cluster_name,
578                             ipv6=(family == netutils.IP6Address.family))
579
580   bind_address = constants.IP4_ADDRESS_ANY
581   if family == netutils.IP6Address.family:
582     bind_address = constants.IP6_ADDRESS_ANY
583
584   # set up inter-node password and certificate and restarts the node daemon
585   # and then connect with ssh to set password and start ganeti-noded
586   # note that all the below variables are sanitized at this point,
587   # either by being constants or by the checks above
588   sshrunner.CopyFileToNode(node, constants.NODED_CERT_FILE)
589   sshrunner.CopyFileToNode(node, constants.RAPI_CERT_FILE)
590   sshrunner.CopyFileToNode(node, constants.CONFD_HMAC_KEY)
591   mycommand = ("%s stop-all; %s start %s -b %s" %
592                (constants.DAEMON_UTIL, constants.DAEMON_UTIL, constants.NODED,
593                 utils.ShellQuote(bind_address)))
594
595   result = sshrunner.Run(node, 'root', mycommand, batch=False,
596                          ask_key=ssh_key_check,
597                          use_cluster_key=True,
598                          strict_host_check=ssh_key_check)
599   if result.failed:
600     raise errors.OpExecError("Remote command on node %s, error: %s,"
601                              " output: %s" %
602                              (node, result.fail_reason, result.output))
603
604   _WaitForNodeDaemon(node)
605
606
607 def MasterFailover(no_voting=False):
608   """Failover the master node.
609
610   This checks that we are not already the master, and will cause the
611   current master to cease being master, and the non-master to become
612   new master.
613
614   @type no_voting: boolean
615   @param no_voting: force the operation without remote nodes agreement
616                       (dangerous)
617
618   """
619   sstore = ssconf.SimpleStore()
620
621   old_master, new_master = ssconf.GetMasterAndMyself(sstore)
622   node_list = sstore.GetNodeList()
623   mc_list = sstore.GetMasterCandidates()
624
625   if old_master == new_master:
626     raise errors.OpPrereqError("This commands must be run on the node"
627                                " where you want the new master to be."
628                                " %s is already the master" %
629                                old_master, errors.ECODE_INVAL)
630
631   if new_master not in mc_list:
632     mc_no_master = [name for name in mc_list if name != old_master]
633     raise errors.OpPrereqError("This node is not among the nodes marked"
634                                " as master candidates. Only these nodes"
635                                " can become masters. Current list of"
636                                " master candidates is:\n"
637                                "%s" % ('\n'.join(mc_no_master)),
638                                errors.ECODE_STATE)
639
640   if not no_voting:
641     vote_list = GatherMasterVotes(node_list)
642
643     if vote_list:
644       voted_master = vote_list[0][0]
645       if voted_master is None:
646         raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
647                                    " not respond.", errors.ECODE_ENVIRON)
648       elif voted_master != old_master:
649         raise errors.OpPrereqError("I have a wrong configuration, I believe"
650                                    " the master is %s but the other nodes"
651                                    " voted %s. Please resync the configuration"
652                                    " of this node." %
653                                    (old_master, voted_master),
654                                    errors.ECODE_STATE)
655   # end checks
656
657   rcode = 0
658
659   logging.info("Setting master to %s, old master: %s", new_master, old_master)
660
661   try:
662     # instantiate a real config writer, as we now know we have the
663     # configuration data
664     cfg = config.ConfigWriter(accept_foreign=True)
665
666     cluster_info = cfg.GetClusterInfo()
667     cluster_info.master_node = new_master
668     # this will also regenerate the ssconf files, since we updated the
669     # cluster info
670     cfg.Update(cluster_info, logging.error)
671   except errors.ConfigurationError, err:
672     logging.error("Error while trying to set the new master: %s",
673                   str(err))
674     return 1
675
676   # if cfg.Update worked, then it means the old master daemon won't be
677   # able now to write its own config file (we rely on locking in both
678   # backend.UploadFile() and ConfigWriter._Write(); hence the next
679   # step is to kill the old master
680
681   logging.info("Stopping the master daemon on node %s", old_master)
682
683   result = rpc.RpcRunner.call_node_stop_master(old_master, True)
684   msg = result.fail_msg
685   if msg:
686     logging.error("Could not disable the master role on the old master"
687                  " %s, please disable manually: %s", old_master, msg)
688
689   logging.info("Checking master IP non-reachability...")
690
691   master_ip = sstore.GetMasterIP()
692   total_timeout = 30
693   # Here we have a phase where no master should be running
694   def _check_ip():
695     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
696       raise utils.RetryAgain()
697
698   try:
699     utils.Retry(_check_ip, (1, 1.5, 5), total_timeout)
700   except utils.RetryTimeout:
701     logging.warning("The master IP is still reachable after %s seconds,"
702                     " continuing but activating the master on the current"
703                     " node will probably fail", total_timeout)
704
705   if jstore.CheckDrainFlag():
706     logging.info("Undraining job queue")
707     jstore.SetDrainFlag(False)
708
709   logging.info("Starting the master daemons on the new master")
710
711   result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
712   msg = result.fail_msg
713   if msg:
714     logging.error("Could not start the master role on the new master"
715                   " %s, please check: %s", new_master, msg)
716     rcode = 1
717
718   logging.info("Master failed over from %s to %s", old_master, new_master)
719   return rcode
720
721
722 def GetMaster():
723   """Returns the current master node.
724
725   This is a separate function in bootstrap since it's needed by
726   gnt-cluster, and instead of importing directly ssconf, it's better
727   to abstract it in bootstrap, where we do use ssconf in other
728   functions too.
729
730   """
731   sstore = ssconf.SimpleStore()
732
733   old_master, _ = ssconf.GetMasterAndMyself(sstore)
734
735   return old_master
736
737
738 def GatherMasterVotes(node_list):
739   """Check the agreement on who is the master.
740
741   This function will return a list of (node, number of votes), ordered
742   by the number of votes. Errors will be denoted by the key 'None'.
743
744   Note that the sum of votes is the number of nodes this machine
745   knows, whereas the number of entries in the list could be different
746   (if some nodes vote for another master).
747
748   We remove ourselves from the list since we know that (bugs aside)
749   since we use the same source for configuration information for both
750   backend and boostrap, we'll always vote for ourselves.
751
752   @type node_list: list
753   @param node_list: the list of nodes to query for master info; the current
754       node will be removed if it is in the list
755   @rtype: list
756   @return: list of (node, votes)
757
758   """
759   myself = netutils.Hostname.GetSysName()
760   try:
761     node_list.remove(myself)
762   except ValueError:
763     pass
764   if not node_list:
765     # no nodes left (eventually after removing myself)
766     return []
767   results = rpc.RpcRunner.call_master_info(node_list)
768   if not isinstance(results, dict):
769     # this should not happen (unless internal error in rpc)
770     logging.critical("Can't complete rpc call, aborting master startup")
771     return [(None, len(node_list))]
772   votes = {}
773   for node in results:
774     nres = results[node]
775     data = nres.payload
776     msg = nres.fail_msg
777     fail = False
778     if msg:
779       logging.warning("Error contacting node %s: %s", node, msg)
780       fail = True
781     # for now we accept both length 3 and 4 (data[3] is primary ip version)
782     elif not isinstance(data, (tuple, list)) or len(data) < 3:
783       logging.warning("Invalid data received from node %s: %s", node, data)
784       fail = True
785     if fail:
786       if None not in votes:
787         votes[None] = 0
788       votes[None] += 1
789       continue
790     master_node = data[2]
791     if master_node not in votes:
792       votes[master_node] = 0
793     votes[master_node] += 1
794
795   vote_list = [v for v in votes.items()]
796   # sort first on number of votes then on name, since we want None
797   # sorted later if we have the half of the nodes not responding, and
798   # half voting all for the same master
799   vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
800
801   return vote_list