Small improvements for cluster verify
[ganeti-local] / lib / bootstrap.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions to bootstrap a new cluster.
23
24 """
25
26 import os
27 import os.path
28 import re
29 import logging
30 import time
31
32 from ganeti import rpc
33 from ganeti import ssh
34 from ganeti import utils
35 from ganeti import errors
36 from ganeti import config
37 from ganeti import constants
38 from ganeti import objects
39 from ganeti import ssconf
40 from ganeti import serializer
41 from ganeti import hypervisor
42 from ganeti import bdev
43 from ganeti import netutils
44 from ganeti import backend
45 from ganeti import luxi
46 from ganeti import jstore
47
48
49 # ec_id for InitConfig's temporary reservation manager
50 _INITCONF_ECID = "initconfig-ecid"
51
52 #: After how many seconds daemon must be responsive
53 _DAEMON_READY_TIMEOUT = 10.0
54
55
56 def _InitSSHSetup():
57   """Setup the SSH configuration for the cluster.
58
59   This generates a dsa keypair for root, adds the pub key to the
60   permitted hosts and adds the hostkey to its own known hosts.
61
62   """
63   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
64
65   for name in priv_key, pub_key:
66     if os.path.exists(name):
67       utils.CreateBackup(name)
68     utils.RemoveFile(name)
69
70   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
71                          "-f", priv_key,
72                          "-q", "-N", ""])
73   if result.failed:
74     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
75                              result.output)
76
77   utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
78
79
80 def GenerateHmacKey(file_name):
81   """Writes a new HMAC key.
82
83   @type file_name: str
84   @param file_name: Path to output file
85
86   """
87   utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
88                   backup=True)
89
90
91 def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_confd_hmac_key,
92                           new_cds, rapi_cert_pem=None, cds=None,
93                           nodecert_file=constants.NODED_CERT_FILE,
94                           rapicert_file=constants.RAPI_CERT_FILE,
95                           hmackey_file=constants.CONFD_HMAC_KEY,
96                           cds_file=constants.CLUSTER_DOMAIN_SECRET_FILE):
97   """Updates the cluster certificates, keys and secrets.
98
99   @type new_cluster_cert: bool
100   @param new_cluster_cert: Whether to generate a new cluster certificate
101   @type new_rapi_cert: bool
102   @param new_rapi_cert: Whether to generate a new RAPI certificate
103   @type new_confd_hmac_key: bool
104   @param new_confd_hmac_key: Whether to generate a new HMAC key
105   @type new_cds: bool
106   @param new_cds: Whether to generate a new cluster domain secret
107   @type rapi_cert_pem: string
108   @param rapi_cert_pem: New RAPI certificate in PEM format
109   @type cds: string
110   @param cds: New cluster domain secret
111   @type nodecert_file: string
112   @param nodecert_file: optional override of the node cert file path
113   @type rapicert_file: string
114   @param rapicert_file: optional override of the rapi cert file path
115   @type hmackey_file: string
116   @param hmackey_file: optional override of the hmac key file path
117
118   """
119   # noded SSL certificate
120   cluster_cert_exists = os.path.exists(nodecert_file)
121   if new_cluster_cert or not cluster_cert_exists:
122     if cluster_cert_exists:
123       utils.CreateBackup(nodecert_file)
124
125     logging.debug("Generating new cluster certificate at %s", nodecert_file)
126     utils.GenerateSelfSignedSslCert(nodecert_file)
127
128   # confd HMAC key
129   if new_confd_hmac_key or not os.path.exists(hmackey_file):
130     logging.debug("Writing new confd HMAC key to %s", hmackey_file)
131     GenerateHmacKey(hmackey_file)
132
133   # RAPI
134   rapi_cert_exists = os.path.exists(rapicert_file)
135
136   if rapi_cert_pem:
137     # Assume rapi_pem contains a valid PEM-formatted certificate and key
138     logging.debug("Writing RAPI certificate at %s", rapicert_file)
139     utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True)
140
141   elif new_rapi_cert or not rapi_cert_exists:
142     if rapi_cert_exists:
143       utils.CreateBackup(rapicert_file)
144
145     logging.debug("Generating new RAPI certificate at %s", rapicert_file)
146     utils.GenerateSelfSignedSslCert(rapicert_file)
147
148   # Cluster domain secret
149   if cds:
150     logging.debug("Writing cluster domain secret to %s", cds_file)
151     utils.WriteFile(cds_file, data=cds, backup=True)
152
153   elif new_cds or not os.path.exists(cds_file):
154     logging.debug("Generating new cluster domain secret at %s", cds_file)
155     GenerateHmacKey(cds_file)
156
157
158 def _InitGanetiServerSetup(master_name):
159   """Setup the necessary configuration for the initial node daemon.
160
161   This creates the nodepass file containing the shared password for
162   the cluster, generates the SSL certificate and starts the node daemon.
163
164   @type master_name: str
165   @param master_name: Name of the master node
166
167   """
168   # Generate cluster secrets
169   GenerateClusterCrypto(True, False, False, False)
170
171   result = utils.RunCmd([constants.DAEMON_UTIL, "start", constants.NODED])
172   if result.failed:
173     raise errors.OpExecError("Could not start the node daemon, command %s"
174                              " had exitcode %s and error %s" %
175                              (result.cmd, result.exit_code, result.output))
176
177   _WaitForNodeDaemon(master_name)
178
179
180 def _WaitForNodeDaemon(node_name):
181   """Wait for node daemon to become responsive.
182
183   """
184   def _CheckNodeDaemon():
185     result = rpc.RpcRunner.call_version([node_name])[node_name]
186     if result.fail_msg:
187       raise utils.RetryAgain()
188
189   try:
190     utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
191   except utils.RetryTimeout:
192     raise errors.OpExecError("Node daemon on %s didn't answer queries within"
193                              " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
194
195
196 def _WaitForMasterDaemon():
197   """Wait for master daemon to become responsive.
198
199   """
200   def _CheckMasterDaemon():
201     try:
202       cl = luxi.Client()
203       (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
204     except Exception:
205       raise utils.RetryAgain()
206
207     logging.debug("Received cluster name %s from master", cluster_name)
208
209   try:
210     utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
211   except utils.RetryTimeout:
212     raise errors.OpExecError("Master daemon didn't answer queries within"
213                              " %s seconds" % _DAEMON_READY_TIMEOUT)
214
215
216 def _InitFileStorage(file_storage_dir):
217   """Initialize if needed the file storage.
218
219   @param file_storage_dir: the user-supplied value
220   @return: either empty string (if file storage was disabled at build
221       time) or the normalized path to the storage directory
222
223   """
224   file_storage_dir = os.path.normpath(file_storage_dir)
225
226   if not os.path.isabs(file_storage_dir):
227     raise errors.OpPrereqError("File storage directory '%s' is not an absolute"
228                                " path" % file_storage_dir, errors.ECODE_INVAL)
229
230   if not os.path.exists(file_storage_dir):
231     try:
232       os.makedirs(file_storage_dir, 0750)
233     except OSError, err:
234       raise errors.OpPrereqError("Cannot create file storage directory"
235                                  " '%s': %s" % (file_storage_dir, err),
236                                  errors.ECODE_ENVIRON)
237
238   if not os.path.isdir(file_storage_dir):
239     raise errors.OpPrereqError("The file storage directory '%s' is not"
240                                " a directory." % file_storage_dir,
241                                errors.ECODE_ENVIRON)
242   return file_storage_dir
243
244
245 def InitCluster(cluster_name, mac_prefix, # pylint: disable-msg=R0913
246                 master_netdev, file_storage_dir, shared_file_storage_dir,
247                 candidate_pool_size, secondary_ip=None, vg_name=None,
248                 beparams=None, nicparams=None, ndparams=None, hvparams=None,
249                 enabled_hypervisors=None, modify_etc_hosts=True,
250                 modify_ssh_setup=True, maintain_node_health=False,
251                 drbd_helper=None, uid_pool=None, default_iallocator=None,
252                 primary_ip_version=None, prealloc_wipe_disks=False):
253   """Initialise the cluster.
254
255   @type candidate_pool_size: int
256   @param candidate_pool_size: master candidate pool size
257
258   """
259   # TODO: complete the docstring
260   if config.ConfigWriter.IsCluster():
261     raise errors.OpPrereqError("Cluster is already initialised",
262                                errors.ECODE_STATE)
263
264   if not enabled_hypervisors:
265     raise errors.OpPrereqError("Enabled hypervisors list must contain at"
266                                " least one member", errors.ECODE_INVAL)
267   invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
268   if invalid_hvs:
269     raise errors.OpPrereqError("Enabled hypervisors contains invalid"
270                                " entries: %s" % invalid_hvs,
271                                errors.ECODE_INVAL)
272
273
274   ipcls = None
275   if primary_ip_version == constants.IP4_VERSION:
276     ipcls = netutils.IP4Address
277   elif primary_ip_version == constants.IP6_VERSION:
278     ipcls = netutils.IP6Address
279   else:
280     raise errors.OpPrereqError("Invalid primary ip version: %d." %
281                                primary_ip_version)
282
283   hostname = netutils.GetHostname(family=ipcls.family)
284   if not ipcls.IsValid(hostname.ip):
285     raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
286                                " address." % (hostname.ip, primary_ip_version))
287
288   if ipcls.IsLoopback(hostname.ip):
289     raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
290                                " address. Please fix DNS or %s." %
291                                (hostname.ip, constants.ETC_HOSTS),
292                                errors.ECODE_ENVIRON)
293
294   if not ipcls.Own(hostname.ip):
295     raise errors.OpPrereqError("Inconsistency: this host's name resolves"
296                                " to %s,\nbut this ip address does not"
297                                " belong to this host" %
298                                hostname.ip, errors.ECODE_ENVIRON)
299
300   clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
301
302   if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
303     raise errors.OpPrereqError("Cluster IP already active",
304                                errors.ECODE_NOTUNIQUE)
305
306   if not secondary_ip:
307     if primary_ip_version == constants.IP6_VERSION:
308       raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
309                                  " IPv4 address must be given as secondary",
310                                  errors.ECODE_INVAL)
311     secondary_ip = hostname.ip
312
313   if not netutils.IP4Address.IsValid(secondary_ip):
314     raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
315                                " IPv4 address." % secondary_ip,
316                                errors.ECODE_INVAL)
317
318   if not netutils.IP4Address.Own(secondary_ip):
319     raise errors.OpPrereqError("You gave %s as secondary IP,"
320                                " but it does not belong to this host." %
321                                secondary_ip, errors.ECODE_ENVIRON)
322
323   if vg_name is not None:
324     # Check if volume group is valid
325     vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
326                                           constants.MIN_VG_SIZE)
327     if vgstatus:
328       raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
329                                  " you are not using lvm" % vgstatus,
330                                  errors.ECODE_INVAL)
331
332   if drbd_helper is not None:
333     try:
334       curr_helper = bdev.BaseDRBD.GetUsermodeHelper()
335     except errors.BlockDeviceError, err:
336       raise errors.OpPrereqError("Error while checking drbd helper"
337                                  " (specify --no-drbd-storage if you are not"
338                                  " using drbd): %s" % str(err),
339                                  errors.ECODE_ENVIRON)
340     if drbd_helper != curr_helper:
341       raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s"
342                                  " is the current helper" % (drbd_helper,
343                                                              curr_helper),
344                                  errors.ECODE_INVAL)
345
346   if constants.ENABLE_FILE_STORAGE:
347     file_storage_dir = _InitFileStorage(file_storage_dir)
348   else:
349     file_storage_dir = ""
350
351   if constants.ENABLE_SHARED_FILE_STORAGE:
352     shared_file_storage_dir = _InitFileStorage(shared_file_storage_dir)
353   else:
354     shared_file_storage_dir = ""
355
356   if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
357     raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
358                                errors.ECODE_INVAL)
359
360   result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
361   if result.failed:
362     raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
363                                (master_netdev,
364                                 result.output.strip()), errors.ECODE_INVAL)
365
366   dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE)]
367   utils.EnsureDirs(dirs)
368
369   utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
370   utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
371   objects.NIC.CheckParameterSyntax(nicparams)
372
373   if ndparams is not None:
374     utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
375   else:
376     ndparams = dict(constants.NDC_DEFAULTS)
377
378   # hvparams is a mapping of hypervisor->hvparams dict
379   for hv_name, hv_params in hvparams.iteritems():
380     utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
381     hv_class = hypervisor.GetHypervisor(hv_name)
382     hv_class.CheckParameterSyntax(hv_params)
383
384   # set up ssh config and /etc/hosts
385   sshline = utils.ReadFile(constants.SSH_HOST_RSA_PUB)
386   sshkey = sshline.split(" ")[1]
387
388   if modify_etc_hosts:
389     utils.AddHostToEtcHosts(hostname.name, hostname.ip)
390
391   if modify_ssh_setup:
392     _InitSSHSetup()
393
394   if default_iallocator is not None:
395     alloc_script = utils.FindFile(default_iallocator,
396                                   constants.IALLOCATOR_SEARCH_PATH,
397                                   os.path.isfile)
398     if alloc_script is None:
399       raise errors.OpPrereqError("Invalid default iallocator script '%s'"
400                                  " specified" % default_iallocator,
401                                  errors.ECODE_INVAL)
402   elif constants.HTOOLS:
403     # htools was enabled at build-time, we default to it
404     if utils.FindFile(constants.IALLOC_HAIL,
405                       constants.IALLOCATOR_SEARCH_PATH,
406                       os.path.isfile):
407       default_iallocator = constants.IALLOC_HAIL
408
409   now = time.time()
410
411   # init of cluster config file
412   cluster_config = objects.Cluster(
413     serial_no=1,
414     rsahostkeypub=sshkey,
415     highest_used_port=(constants.FIRST_DRBD_PORT - 1),
416     mac_prefix=mac_prefix,
417     volume_group_name=vg_name,
418     tcpudp_port_pool=set(),
419     master_node=hostname.name,
420     master_ip=clustername.ip,
421     master_netdev=master_netdev,
422     cluster_name=clustername.name,
423     file_storage_dir=file_storage_dir,
424     shared_file_storage_dir=shared_file_storage_dir,
425     enabled_hypervisors=enabled_hypervisors,
426     beparams={constants.PP_DEFAULT: beparams},
427     nicparams={constants.PP_DEFAULT: nicparams},
428     ndparams=ndparams,
429     hvparams=hvparams,
430     candidate_pool_size=candidate_pool_size,
431     modify_etc_hosts=modify_etc_hosts,
432     modify_ssh_setup=modify_ssh_setup,
433     uid_pool=uid_pool,
434     ctime=now,
435     mtime=now,
436     maintain_node_health=maintain_node_health,
437     drbd_usermode_helper=drbd_helper,
438     default_iallocator=default_iallocator,
439     primary_ip_family=ipcls.family,
440     prealloc_wipe_disks=prealloc_wipe_disks,
441     )
442   master_node_config = objects.Node(name=hostname.name,
443                                     primary_ip=hostname.ip,
444                                     secondary_ip=secondary_ip,
445                                     serial_no=1,
446                                     master_candidate=True,
447                                     offline=False, drained=False,
448                                     ctime=now, mtime=now,
449                                     )
450   InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
451   cfg = config.ConfigWriter(offline=True)
452   ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
453   cfg.Update(cfg.GetClusterInfo(), logging.error)
454   backend.WriteSsconfFiles(cfg.GetSsconfValues())
455
456   # set up the inter-node password and certificate
457   _InitGanetiServerSetup(hostname.name)
458
459   logging.debug("Starting daemons")
460   result = utils.RunCmd([constants.DAEMON_UTIL, "start-all"])
461   if result.failed:
462     raise errors.OpExecError("Could not start daemons, command %s"
463                              " had exitcode %s and error %s" %
464                              (result.cmd, result.exit_code, result.output))
465
466   _WaitForMasterDaemon()
467
468
469 def InitConfig(version, cluster_config, master_node_config,
470                cfg_file=constants.CLUSTER_CONF_FILE):
471   """Create the initial cluster configuration.
472
473   It will contain the current node, which will also be the master
474   node, and no instances.
475
476   @type version: int
477   @param version: configuration version
478   @type cluster_config: L{objects.Cluster}
479   @param cluster_config: cluster configuration
480   @type master_node_config: L{objects.Node}
481   @param master_node_config: master node configuration
482   @type cfg_file: string
483   @param cfg_file: configuration file path
484
485   """
486   uuid_generator = config.TemporaryReservationManager()
487   cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
488                                                 _INITCONF_ECID)
489   master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
490                                                     _INITCONF_ECID)
491   nodes = {
492     master_node_config.name: master_node_config,
493     }
494   default_nodegroup = objects.NodeGroup(
495     uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
496     name=constants.INITIAL_NODE_GROUP_NAME,
497     members=[master_node_config.name],
498     )
499   nodegroups = {
500     default_nodegroup.uuid: default_nodegroup,
501     }
502   now = time.time()
503   config_data = objects.ConfigData(version=version,
504                                    cluster=cluster_config,
505                                    nodegroups=nodegroups,
506                                    nodes=nodes,
507                                    instances={},
508                                    serial_no=1,
509                                    ctime=now, mtime=now)
510   utils.WriteFile(cfg_file,
511                   data=serializer.Dump(config_data.ToDict()),
512                   mode=0600)
513
514
515 def FinalizeClusterDestroy(master):
516   """Execute the last steps of cluster destroy
517
518   This function shuts down all the daemons, completing the destroy
519   begun in cmdlib.LUDestroyOpcode.
520
521   """
522   cfg = config.ConfigWriter()
523   modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
524   result = rpc.RpcRunner.call_node_stop_master(master, True)
525   msg = result.fail_msg
526   if msg:
527     logging.warning("Could not disable the master role: %s", msg)
528   result = rpc.RpcRunner.call_node_leave_cluster(master, modify_ssh_setup)
529   msg = result.fail_msg
530   if msg:
531     logging.warning("Could not shutdown the node daemon and cleanup"
532                     " the node: %s", msg)
533
534
535 def SetupNodeDaemon(cluster_name, node, ssh_key_check):
536   """Add a node to the cluster.
537
538   This function must be called before the actual opcode, and will ssh
539   to the remote node, copy the needed files, and start ganeti-noded,
540   allowing the master to do the rest via normal rpc calls.
541
542   @param cluster_name: the cluster name
543   @param node: the name of the new node
544   @param ssh_key_check: whether to do a strict key check
545
546   """
547   family = ssconf.SimpleStore().GetPrimaryIPFamily()
548   sshrunner = ssh.SshRunner(cluster_name,
549                             ipv6=(family == netutils.IP6Address.family))
550
551   bind_address = constants.IP4_ADDRESS_ANY
552   if family == netutils.IP6Address.family:
553     bind_address = constants.IP6_ADDRESS_ANY
554
555   # set up inter-node password and certificate and restarts the node daemon
556   # and then connect with ssh to set password and start ganeti-noded
557   # note that all the below variables are sanitized at this point,
558   # either by being constants or by the checks above
559   sshrunner.CopyFileToNode(node, constants.NODED_CERT_FILE)
560   sshrunner.CopyFileToNode(node, constants.RAPI_CERT_FILE)
561   sshrunner.CopyFileToNode(node, constants.CONFD_HMAC_KEY)
562   mycommand = ("%s stop-all; %s start %s -b %s" %
563                (constants.DAEMON_UTIL, constants.DAEMON_UTIL, constants.NODED,
564                 utils.ShellQuote(bind_address)))
565
566   result = sshrunner.Run(node, 'root', mycommand, batch=False,
567                          ask_key=ssh_key_check,
568                          use_cluster_key=True,
569                          strict_host_check=ssh_key_check)
570   if result.failed:
571     raise errors.OpExecError("Remote command on node %s, error: %s,"
572                              " output: %s" %
573                              (node, result.fail_reason, result.output))
574
575   _WaitForNodeDaemon(node)
576
577
578 def MasterFailover(no_voting=False):
579   """Failover the master node.
580
581   This checks that we are not already the master, and will cause the
582   current master to cease being master, and the non-master to become
583   new master.
584
585   @type no_voting: boolean
586   @param no_voting: force the operation without remote nodes agreement
587                       (dangerous)
588
589   """
590   sstore = ssconf.SimpleStore()
591
592   old_master, new_master = ssconf.GetMasterAndMyself(sstore)
593   node_list = sstore.GetNodeList()
594   mc_list = sstore.GetMasterCandidates()
595
596   if old_master == new_master:
597     raise errors.OpPrereqError("This commands must be run on the node"
598                                " where you want the new master to be."
599                                " %s is already the master" %
600                                old_master, errors.ECODE_INVAL)
601
602   if new_master not in mc_list:
603     mc_no_master = [name for name in mc_list if name != old_master]
604     raise errors.OpPrereqError("This node is not among the nodes marked"
605                                " as master candidates. Only these nodes"
606                                " can become masters. Current list of"
607                                " master candidates is:\n"
608                                "%s" % ('\n'.join(mc_no_master)),
609                                errors.ECODE_STATE)
610
611   if not no_voting:
612     vote_list = GatherMasterVotes(node_list)
613
614     if vote_list:
615       voted_master = vote_list[0][0]
616       if voted_master is None:
617         raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
618                                    " not respond.", errors.ECODE_ENVIRON)
619       elif voted_master != old_master:
620         raise errors.OpPrereqError("I have a wrong configuration, I believe"
621                                    " the master is %s but the other nodes"
622                                    " voted %s. Please resync the configuration"
623                                    " of this node." %
624                                    (old_master, voted_master),
625                                    errors.ECODE_STATE)
626   # end checks
627
628   rcode = 0
629
630   logging.info("Setting master to %s, old master: %s", new_master, old_master)
631
632   try:
633     # instantiate a real config writer, as we now know we have the
634     # configuration data
635     cfg = config.ConfigWriter(accept_foreign=True)
636
637     cluster_info = cfg.GetClusterInfo()
638     cluster_info.master_node = new_master
639     # this will also regenerate the ssconf files, since we updated the
640     # cluster info
641     cfg.Update(cluster_info, logging.error)
642   except errors.ConfigurationError, err:
643     logging.error("Error while trying to set the new master: %s",
644                   str(err))
645     return 1
646
647   # if cfg.Update worked, then it means the old master daemon won't be
648   # able now to write its own config file (we rely on locking in both
649   # backend.UploadFile() and ConfigWriter._Write(); hence the next
650   # step is to kill the old master
651
652   logging.info("Stopping the master daemon on node %s", old_master)
653
654   result = rpc.RpcRunner.call_node_stop_master(old_master, True)
655   msg = result.fail_msg
656   if msg:
657     logging.error("Could not disable the master role on the old master"
658                  " %s, please disable manually: %s", old_master, msg)
659
660   logging.info("Checking master IP non-reachability...")
661
662   master_ip = sstore.GetMasterIP()
663   total_timeout = 30
664   # Here we have a phase where no master should be running
665   def _check_ip():
666     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
667       raise utils.RetryAgain()
668
669   try:
670     utils.Retry(_check_ip, (1, 1.5, 5), total_timeout)
671   except utils.RetryTimeout:
672     logging.warning("The master IP is still reachable after %s seconds,"
673                     " continuing but activating the master on the current"
674                     " node will probably fail", total_timeout)
675
676   if jstore.CheckDrainFlag():
677     logging.info("Undraining job queue")
678     jstore.SetDrainFlag(False)
679
680   logging.info("Starting the master daemons on the new master")
681
682   result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
683   msg = result.fail_msg
684   if msg:
685     logging.error("Could not start the master role on the new master"
686                   " %s, please check: %s", new_master, msg)
687     rcode = 1
688
689   logging.info("Master failed over from %s to %s", old_master, new_master)
690   return rcode
691
692
693 def GetMaster():
694   """Returns the current master node.
695
696   This is a separate function in bootstrap since it's needed by
697   gnt-cluster, and instead of importing directly ssconf, it's better
698   to abstract it in bootstrap, where we do use ssconf in other
699   functions too.
700
701   """
702   sstore = ssconf.SimpleStore()
703
704   old_master, _ = ssconf.GetMasterAndMyself(sstore)
705
706   return old_master
707
708
709 def GatherMasterVotes(node_list):
710   """Check the agreement on who is the master.
711
712   This function will return a list of (node, number of votes), ordered
713   by the number of votes. Errors will be denoted by the key 'None'.
714
715   Note that the sum of votes is the number of nodes this machine
716   knows, whereas the number of entries in the list could be different
717   (if some nodes vote for another master).
718
719   We remove ourselves from the list since we know that (bugs aside)
720   since we use the same source for configuration information for both
721   backend and boostrap, we'll always vote for ourselves.
722
723   @type node_list: list
724   @param node_list: the list of nodes to query for master info; the current
725       node will be removed if it is in the list
726   @rtype: list
727   @return: list of (node, votes)
728
729   """
730   myself = netutils.Hostname.GetSysName()
731   try:
732     node_list.remove(myself)
733   except ValueError:
734     pass
735   if not node_list:
736     # no nodes left (eventually after removing myself)
737     return []
738   results = rpc.RpcRunner.call_master_info(node_list)
739   if not isinstance(results, dict):
740     # this should not happen (unless internal error in rpc)
741     logging.critical("Can't complete rpc call, aborting master startup")
742     return [(None, len(node_list))]
743   votes = {}
744   for node in results:
745     nres = results[node]
746     data = nres.payload
747     msg = nres.fail_msg
748     fail = False
749     if msg:
750       logging.warning("Error contacting node %s: %s", node, msg)
751       fail = True
752     # for now we accept both length 3 and 4 (data[3] is primary ip version)
753     elif not isinstance(data, (tuple, list)) or len(data) < 3:
754       logging.warning("Invalid data received from node %s: %s", node, data)
755       fail = True
756     if fail:
757       if None not in votes:
758         votes[None] = 0
759       votes[None] += 1
760       continue
761     master_node = data[2]
762     if master_node not in votes:
763       votes[master_node] = 0
764     votes[master_node] += 1
765
766   vote_list = [v for v in votes.items()]
767   # sort first on number of votes then on name, since we want None
768   # sorted later if we have the half of the nodes not responding, and
769   # half voting all for the same master
770   vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
771
772   return vote_list