Core shared file storage support
[ganeti-local] / lib / bootstrap.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions to bootstrap a new cluster.
23
24 """
25
26 import os
27 import os.path
28 import re
29 import logging
30 import time
31
32 from ganeti import rpc
33 from ganeti import ssh
34 from ganeti import utils
35 from ganeti import errors
36 from ganeti import config
37 from ganeti import constants
38 from ganeti import objects
39 from ganeti import ssconf
40 from ganeti import serializer
41 from ganeti import hypervisor
42 from ganeti import bdev
43 from ganeti import netutils
44 from ganeti import backend
45 from ganeti import luxi
46 from ganeti import jstore
47
48
49 # ec_id for InitConfig's temporary reservation manager
50 _INITCONF_ECID = "initconfig-ecid"
51
52 #: After how many seconds daemon must be responsive
53 _DAEMON_READY_TIMEOUT = 10.0
54
55
56 def _InitSSHSetup():
57   """Setup the SSH configuration for the cluster.
58
59   This generates a dsa keypair for root, adds the pub key to the
60   permitted hosts and adds the hostkey to its own known hosts.
61
62   """
63   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
64
65   for name in priv_key, pub_key:
66     if os.path.exists(name):
67       utils.CreateBackup(name)
68     utils.RemoveFile(name)
69
70   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
71                          "-f", priv_key,
72                          "-q", "-N", ""])
73   if result.failed:
74     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
75                              result.output)
76
77   utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
78
79
80 def GenerateHmacKey(file_name):
81   """Writes a new HMAC key.
82
83   @type file_name: str
84   @param file_name: Path to output file
85
86   """
87   utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
88                   backup=True)
89
90
91 def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_confd_hmac_key,
92                           new_cds, rapi_cert_pem=None, cds=None,
93                           nodecert_file=constants.NODED_CERT_FILE,
94                           rapicert_file=constants.RAPI_CERT_FILE,
95                           hmackey_file=constants.CONFD_HMAC_KEY,
96                           cds_file=constants.CLUSTER_DOMAIN_SECRET_FILE):
97   """Updates the cluster certificates, keys and secrets.
98
99   @type new_cluster_cert: bool
100   @param new_cluster_cert: Whether to generate a new cluster certificate
101   @type new_rapi_cert: bool
102   @param new_rapi_cert: Whether to generate a new RAPI certificate
103   @type new_confd_hmac_key: bool
104   @param new_confd_hmac_key: Whether to generate a new HMAC key
105   @type new_cds: bool
106   @param new_cds: Whether to generate a new cluster domain secret
107   @type rapi_cert_pem: string
108   @param rapi_cert_pem: New RAPI certificate in PEM format
109   @type cds: string
110   @param cds: New cluster domain secret
111   @type nodecert_file: string
112   @param nodecert_file: optional override of the node cert file path
113   @type rapicert_file: string
114   @param rapicert_file: optional override of the rapi cert file path
115   @type hmackey_file: string
116   @param hmackey_file: optional override of the hmac key file path
117
118   """
119   # noded SSL certificate
120   cluster_cert_exists = os.path.exists(nodecert_file)
121   if new_cluster_cert or not cluster_cert_exists:
122     if cluster_cert_exists:
123       utils.CreateBackup(nodecert_file)
124
125     logging.debug("Generating new cluster certificate at %s", nodecert_file)
126     utils.GenerateSelfSignedSslCert(nodecert_file)
127
128   # confd HMAC key
129   if new_confd_hmac_key or not os.path.exists(hmackey_file):
130     logging.debug("Writing new confd HMAC key to %s", hmackey_file)
131     GenerateHmacKey(hmackey_file)
132
133   # RAPI
134   rapi_cert_exists = os.path.exists(rapicert_file)
135
136   if rapi_cert_pem:
137     # Assume rapi_pem contains a valid PEM-formatted certificate and key
138     logging.debug("Writing RAPI certificate at %s", rapicert_file)
139     utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True)
140
141   elif new_rapi_cert or not rapi_cert_exists:
142     if rapi_cert_exists:
143       utils.CreateBackup(rapicert_file)
144
145     logging.debug("Generating new RAPI certificate at %s", rapicert_file)
146     utils.GenerateSelfSignedSslCert(rapicert_file)
147
148   # Cluster domain secret
149   if cds:
150     logging.debug("Writing cluster domain secret to %s", cds_file)
151     utils.WriteFile(cds_file, data=cds, backup=True)
152
153   elif new_cds or not os.path.exists(cds_file):
154     logging.debug("Generating new cluster domain secret at %s", cds_file)
155     GenerateHmacKey(cds_file)
156
157
158 def _InitGanetiServerSetup(master_name):
159   """Setup the necessary configuration for the initial node daemon.
160
161   This creates the nodepass file containing the shared password for
162   the cluster, generates the SSL certificate and starts the node daemon.
163
164   @type master_name: str
165   @param master_name: Name of the master node
166
167   """
168   # Generate cluster secrets
169   GenerateClusterCrypto(True, False, False, False)
170
171   result = utils.RunCmd([constants.DAEMON_UTIL, "start", constants.NODED])
172   if result.failed:
173     raise errors.OpExecError("Could not start the node daemon, command %s"
174                              " had exitcode %s and error %s" %
175                              (result.cmd, result.exit_code, result.output))
176
177   _WaitForNodeDaemon(master_name)
178
179
180 def _WaitForNodeDaemon(node_name):
181   """Wait for node daemon to become responsive.
182
183   """
184   def _CheckNodeDaemon():
185     result = rpc.RpcRunner.call_version([node_name])[node_name]
186     if result.fail_msg:
187       raise utils.RetryAgain()
188
189   try:
190     utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
191   except utils.RetryTimeout:
192     raise errors.OpExecError("Node daemon on %s didn't answer queries within"
193                              " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
194
195
196 def _WaitForMasterDaemon():
197   """Wait for master daemon to become responsive.
198
199   """
200   def _CheckMasterDaemon():
201     try:
202       cl = luxi.Client()
203       (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
204     except Exception:
205       raise utils.RetryAgain()
206
207     logging.debug("Received cluster name %s from master", cluster_name)
208
209   try:
210     utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
211   except utils.RetryTimeout:
212     raise errors.OpExecError("Master daemon didn't answer queries within"
213                              " %s seconds" % _DAEMON_READY_TIMEOUT)
214
215
216 def _InitFileStorage(file_storage_dir):
217   """Initialize if needed the file storage.
218
219   @param file_storage_dir: the user-supplied value
220   @return: either empty string (if file storage was disabled at build
221       time) or the normalized path to the storage directory
222
223   """
224   if not constants.ENABLE_FILE_STORAGE:
225     return ""
226
227   file_storage_dir = os.path.normpath(file_storage_dir)
228
229   if not os.path.isabs(file_storage_dir):
230     raise errors.OpPrereqError("The file storage directory you passed is"
231                                " not an absolute path.", errors.ECODE_INVAL)
232
233   if not os.path.exists(file_storage_dir):
234     try:
235       os.makedirs(file_storage_dir, 0750)
236     except OSError, err:
237       raise errors.OpPrereqError("Cannot create file storage directory"
238                                  " '%s': %s" % (file_storage_dir, err),
239                                  errors.ECODE_ENVIRON)
240
241   if not os.path.isdir(file_storage_dir):
242     raise errors.OpPrereqError("The file storage directory '%s' is not"
243                                " a directory." % file_storage_dir,
244                                errors.ECODE_ENVIRON)
245   return file_storage_dir
246
247
248 def InitCluster(cluster_name, mac_prefix, # pylint: disable-msg=R0913
249                 master_netdev, file_storage_dir, candidate_pool_size,
250                 secondary_ip=None, vg_name=None, beparams=None,
251                 nicparams=None, ndparams=None, hvparams=None,
252                 enabled_hypervisors=None, modify_etc_hosts=True,
253                 modify_ssh_setup=True, maintain_node_health=False,
254                 drbd_helper=None, uid_pool=None, default_iallocator=None,
255                 primary_ip_version=None, prealloc_wipe_disks=False):
256   """Initialise the cluster.
257
258   @type candidate_pool_size: int
259   @param candidate_pool_size: master candidate pool size
260
261   """
262   # TODO: complete the docstring
263   if config.ConfigWriter.IsCluster():
264     raise errors.OpPrereqError("Cluster is already initialised",
265                                errors.ECODE_STATE)
266
267   if not enabled_hypervisors:
268     raise errors.OpPrereqError("Enabled hypervisors list must contain at"
269                                " least one member", errors.ECODE_INVAL)
270   invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
271   if invalid_hvs:
272     raise errors.OpPrereqError("Enabled hypervisors contains invalid"
273                                " entries: %s" % invalid_hvs,
274                                errors.ECODE_INVAL)
275
276
277   ipcls = None
278   if primary_ip_version == constants.IP4_VERSION:
279     ipcls = netutils.IP4Address
280   elif primary_ip_version == constants.IP6_VERSION:
281     ipcls = netutils.IP6Address
282   else:
283     raise errors.OpPrereqError("Invalid primary ip version: %d." %
284                                primary_ip_version)
285
286   hostname = netutils.GetHostname(family=ipcls.family)
287   if not ipcls.IsValid(hostname.ip):
288     raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
289                                " address." % (hostname.ip, primary_ip_version))
290
291   if ipcls.IsLoopback(hostname.ip):
292     raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
293                                " address. Please fix DNS or %s." %
294                                (hostname.ip, constants.ETC_HOSTS),
295                                errors.ECODE_ENVIRON)
296
297   if not ipcls.Own(hostname.ip):
298     raise errors.OpPrereqError("Inconsistency: this host's name resolves"
299                                " to %s,\nbut this ip address does not"
300                                " belong to this host" %
301                                hostname.ip, errors.ECODE_ENVIRON)
302
303   clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
304
305   if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
306     raise errors.OpPrereqError("Cluster IP already active",
307                                errors.ECODE_NOTUNIQUE)
308
309   if not secondary_ip:
310     if primary_ip_version == constants.IP6_VERSION:
311       raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
312                                  " IPv4 address must be given as secondary",
313                                  errors.ECODE_INVAL)
314     secondary_ip = hostname.ip
315
316   if not netutils.IP4Address.IsValid(secondary_ip):
317     raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
318                                " IPv4 address." % secondary_ip,
319                                errors.ECODE_INVAL)
320
321   if not netutils.IP4Address.Own(secondary_ip):
322     raise errors.OpPrereqError("You gave %s as secondary IP,"
323                                " but it does not belong to this host." %
324                                secondary_ip, errors.ECODE_ENVIRON)
325
326   if vg_name is not None:
327     # Check if volume group is valid
328     vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
329                                           constants.MIN_VG_SIZE)
330     if vgstatus:
331       raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
332                                  " you are not using lvm" % vgstatus,
333                                  errors.ECODE_INVAL)
334
335   if drbd_helper is not None:
336     try:
337       curr_helper = bdev.BaseDRBD.GetUsermodeHelper()
338     except errors.BlockDeviceError, err:
339       raise errors.OpPrereqError("Error while checking drbd helper"
340                                  " (specify --no-drbd-storage if you are not"
341                                  " using drbd): %s" % str(err),
342                                  errors.ECODE_ENVIRON)
343     if drbd_helper != curr_helper:
344       raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s"
345                                  " is the current helper" % (drbd_helper,
346                                                              curr_helper),
347                                  errors.ECODE_INVAL)
348
349   file_storage_dir = _InitFileStorage(file_storage_dir)
350
351   if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
352     raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
353                                errors.ECODE_INVAL)
354
355   result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
356   if result.failed:
357     raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
358                                (master_netdev,
359                                 result.output.strip()), errors.ECODE_INVAL)
360
361   dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE)]
362   utils.EnsureDirs(dirs)
363
364   utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
365   utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
366   objects.NIC.CheckParameterSyntax(nicparams)
367
368   if ndparams is not None:
369     utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
370   else:
371     ndparams = dict(constants.NDC_DEFAULTS)
372
373   # hvparams is a mapping of hypervisor->hvparams dict
374   for hv_name, hv_params in hvparams.iteritems():
375     utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
376     hv_class = hypervisor.GetHypervisor(hv_name)
377     hv_class.CheckParameterSyntax(hv_params)
378
379   # set up ssh config and /etc/hosts
380   sshline = utils.ReadFile(constants.SSH_HOST_RSA_PUB)
381   sshkey = sshline.split(" ")[1]
382
383   if modify_etc_hosts:
384     utils.AddHostToEtcHosts(hostname.name, hostname.ip)
385
386   if modify_ssh_setup:
387     _InitSSHSetup()
388
389   if default_iallocator is not None:
390     alloc_script = utils.FindFile(default_iallocator,
391                                   constants.IALLOCATOR_SEARCH_PATH,
392                                   os.path.isfile)
393     if alloc_script is None:
394       raise errors.OpPrereqError("Invalid default iallocator script '%s'"
395                                  " specified" % default_iallocator,
396                                  errors.ECODE_INVAL)
397
398   now = time.time()
399
400   # init of cluster config file
401   cluster_config = objects.Cluster(
402     serial_no=1,
403     rsahostkeypub=sshkey,
404     highest_used_port=(constants.FIRST_DRBD_PORT - 1),
405     mac_prefix=mac_prefix,
406     volume_group_name=vg_name,
407     tcpudp_port_pool=set(),
408     master_node=hostname.name,
409     master_ip=clustername.ip,
410     master_netdev=master_netdev,
411     cluster_name=clustername.name,
412     file_storage_dir=file_storage_dir,
413     shared_file_storage_dir=shared_file_storage_dir,
414     enabled_hypervisors=enabled_hypervisors,
415     beparams={constants.PP_DEFAULT: beparams},
416     nicparams={constants.PP_DEFAULT: nicparams},
417     ndparams=ndparams,
418     hvparams=hvparams,
419     candidate_pool_size=candidate_pool_size,
420     modify_etc_hosts=modify_etc_hosts,
421     modify_ssh_setup=modify_ssh_setup,
422     uid_pool=uid_pool,
423     ctime=now,
424     mtime=now,
425     maintain_node_health=maintain_node_health,
426     drbd_usermode_helper=drbd_helper,
427     default_iallocator=default_iallocator,
428     primary_ip_family=ipcls.family,
429     prealloc_wipe_disks=prealloc_wipe_disks,
430     )
431   master_node_config = objects.Node(name=hostname.name,
432                                     primary_ip=hostname.ip,
433                                     secondary_ip=secondary_ip,
434                                     serial_no=1,
435                                     master_candidate=True,
436                                     offline=False, drained=False,
437                                     ctime=now, mtime=now,
438                                     )
439   InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
440   cfg = config.ConfigWriter(offline=True)
441   ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
442   cfg.Update(cfg.GetClusterInfo(), logging.error)
443   backend.WriteSsconfFiles(cfg.GetSsconfValues())
444
445   # set up the inter-node password and certificate
446   _InitGanetiServerSetup(hostname.name)
447
448   logging.debug("Starting daemons")
449   result = utils.RunCmd([constants.DAEMON_UTIL, "start-all"])
450   if result.failed:
451     raise errors.OpExecError("Could not start daemons, command %s"
452                              " had exitcode %s and error %s" %
453                              (result.cmd, result.exit_code, result.output))
454
455   _WaitForMasterDaemon()
456
457
458 def InitConfig(version, cluster_config, master_node_config,
459                cfg_file=constants.CLUSTER_CONF_FILE):
460   """Create the initial cluster configuration.
461
462   It will contain the current node, which will also be the master
463   node, and no instances.
464
465   @type version: int
466   @param version: configuration version
467   @type cluster_config: L{objects.Cluster}
468   @param cluster_config: cluster configuration
469   @type master_node_config: L{objects.Node}
470   @param master_node_config: master node configuration
471   @type cfg_file: string
472   @param cfg_file: configuration file path
473
474   """
475   uuid_generator = config.TemporaryReservationManager()
476   cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
477                                                 _INITCONF_ECID)
478   master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
479                                                     _INITCONF_ECID)
480   nodes = {
481     master_node_config.name: master_node_config,
482     }
483   default_nodegroup = objects.NodeGroup(
484     uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
485     name=constants.INITIAL_NODE_GROUP_NAME,
486     members=[master_node_config.name],
487     )
488   nodegroups = {
489     default_nodegroup.uuid: default_nodegroup,
490     }
491   now = time.time()
492   config_data = objects.ConfigData(version=version,
493                                    cluster=cluster_config,
494                                    nodegroups=nodegroups,
495                                    nodes=nodes,
496                                    instances={},
497                                    serial_no=1,
498                                    ctime=now, mtime=now)
499   utils.WriteFile(cfg_file,
500                   data=serializer.Dump(config_data.ToDict()),
501                   mode=0600)
502
503
504 def FinalizeClusterDestroy(master):
505   """Execute the last steps of cluster destroy
506
507   This function shuts down all the daemons, completing the destroy
508   begun in cmdlib.LUDestroyOpcode.
509
510   """
511   cfg = config.ConfigWriter()
512   modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
513   result = rpc.RpcRunner.call_node_stop_master(master, True)
514   msg = result.fail_msg
515   if msg:
516     logging.warning("Could not disable the master role: %s", msg)
517   result = rpc.RpcRunner.call_node_leave_cluster(master, modify_ssh_setup)
518   msg = result.fail_msg
519   if msg:
520     logging.warning("Could not shutdown the node daemon and cleanup"
521                     " the node: %s", msg)
522
523
524 def SetupNodeDaemon(cluster_name, node, ssh_key_check):
525   """Add a node to the cluster.
526
527   This function must be called before the actual opcode, and will ssh
528   to the remote node, copy the needed files, and start ganeti-noded,
529   allowing the master to do the rest via normal rpc calls.
530
531   @param cluster_name: the cluster name
532   @param node: the name of the new node
533   @param ssh_key_check: whether to do a strict key check
534
535   """
536   family = ssconf.SimpleStore().GetPrimaryIPFamily()
537   sshrunner = ssh.SshRunner(cluster_name,
538                             ipv6=(family == netutils.IP6Address.family))
539
540   bind_address = constants.IP4_ADDRESS_ANY
541   if family == netutils.IP6Address.family:
542     bind_address = constants.IP6_ADDRESS_ANY
543
544   # set up inter-node password and certificate and restarts the node daemon
545   # and then connect with ssh to set password and start ganeti-noded
546   # note that all the below variables are sanitized at this point,
547   # either by being constants or by the checks above
548   sshrunner.CopyFileToNode(node, constants.NODED_CERT_FILE)
549   sshrunner.CopyFileToNode(node, constants.RAPI_CERT_FILE)
550   sshrunner.CopyFileToNode(node, constants.CONFD_HMAC_KEY)
551   mycommand = ("%s stop-all; %s start %s -b %s" %
552                (constants.DAEMON_UTIL, constants.DAEMON_UTIL, constants.NODED,
553                 utils.ShellQuote(bind_address)))
554
555   result = sshrunner.Run(node, 'root', mycommand, batch=False,
556                          ask_key=ssh_key_check,
557                          use_cluster_key=True,
558                          strict_host_check=ssh_key_check)
559   if result.failed:
560     raise errors.OpExecError("Remote command on node %s, error: %s,"
561                              " output: %s" %
562                              (node, result.fail_reason, result.output))
563
564   _WaitForNodeDaemon(node)
565
566
567 def MasterFailover(no_voting=False):
568   """Failover the master node.
569
570   This checks that we are not already the master, and will cause the
571   current master to cease being master, and the non-master to become
572   new master.
573
574   @type no_voting: boolean
575   @param no_voting: force the operation without remote nodes agreement
576                       (dangerous)
577
578   """
579   sstore = ssconf.SimpleStore()
580
581   old_master, new_master = ssconf.GetMasterAndMyself(sstore)
582   node_list = sstore.GetNodeList()
583   mc_list = sstore.GetMasterCandidates()
584
585   if old_master == new_master:
586     raise errors.OpPrereqError("This commands must be run on the node"
587                                " where you want the new master to be."
588                                " %s is already the master" %
589                                old_master, errors.ECODE_INVAL)
590
591   if new_master not in mc_list:
592     mc_no_master = [name for name in mc_list if name != old_master]
593     raise errors.OpPrereqError("This node is not among the nodes marked"
594                                " as master candidates. Only these nodes"
595                                " can become masters. Current list of"
596                                " master candidates is:\n"
597                                "%s" % ('\n'.join(mc_no_master)),
598                                errors.ECODE_STATE)
599
600   if not no_voting:
601     vote_list = GatherMasterVotes(node_list)
602
603     if vote_list:
604       voted_master = vote_list[0][0]
605       if voted_master is None:
606         raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
607                                    " not respond.", errors.ECODE_ENVIRON)
608       elif voted_master != old_master:
609         raise errors.OpPrereqError("I have a wrong configuration, I believe"
610                                    " the master is %s but the other nodes"
611                                    " voted %s. Please resync the configuration"
612                                    " of this node." %
613                                    (old_master, voted_master),
614                                    errors.ECODE_STATE)
615   # end checks
616
617   rcode = 0
618
619   logging.info("Setting master to %s, old master: %s", new_master, old_master)
620
621   try:
622     # instantiate a real config writer, as we now know we have the
623     # configuration data
624     cfg = config.ConfigWriter(accept_foreign=True)
625
626     cluster_info = cfg.GetClusterInfo()
627     cluster_info.master_node = new_master
628     # this will also regenerate the ssconf files, since we updated the
629     # cluster info
630     cfg.Update(cluster_info, logging.error)
631   except errors.ConfigurationError, err:
632     logging.error("Error while trying to set the new master: %s",
633                   str(err))
634     return 1
635
636   # if cfg.Update worked, then it means the old master daemon won't be
637   # able now to write its own config file (we rely on locking in both
638   # backend.UploadFile() and ConfigWriter._Write(); hence the next
639   # step is to kill the old master
640
641   logging.info("Stopping the master daemon on node %s", old_master)
642
643   result = rpc.RpcRunner.call_node_stop_master(old_master, True)
644   msg = result.fail_msg
645   if msg:
646     logging.error("Could not disable the master role on the old master"
647                  " %s, please disable manually: %s", old_master, msg)
648
649   logging.info("Checking master IP non-reachability...")
650
651   master_ip = sstore.GetMasterIP()
652   total_timeout = 30
653   # Here we have a phase where no master should be running
654   def _check_ip():
655     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
656       raise utils.RetryAgain()
657
658   try:
659     utils.Retry(_check_ip, (1, 1.5, 5), total_timeout)
660   except utils.RetryTimeout:
661     logging.warning("The master IP is still reachable after %s seconds,"
662                     " continuing but activating the master on the current"
663                     " node will probably fail", total_timeout)
664
665   if jstore.CheckDrainFlag():
666     logging.info("Undraining job queue")
667     jstore.SetDrainFlag(False)
668
669   logging.info("Starting the master daemons on the new master")
670
671   result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
672   msg = result.fail_msg
673   if msg:
674     logging.error("Could not start the master role on the new master"
675                   " %s, please check: %s", new_master, msg)
676     rcode = 1
677
678   logging.info("Master failed over from %s to %s", old_master, new_master)
679   return rcode
680
681
682 def GetMaster():
683   """Returns the current master node.
684
685   This is a separate function in bootstrap since it's needed by
686   gnt-cluster, and instead of importing directly ssconf, it's better
687   to abstract it in bootstrap, where we do use ssconf in other
688   functions too.
689
690   """
691   sstore = ssconf.SimpleStore()
692
693   old_master, _ = ssconf.GetMasterAndMyself(sstore)
694
695   return old_master
696
697
698 def GatherMasterVotes(node_list):
699   """Check the agreement on who is the master.
700
701   This function will return a list of (node, number of votes), ordered
702   by the number of votes. Errors will be denoted by the key 'None'.
703
704   Note that the sum of votes is the number of nodes this machine
705   knows, whereas the number of entries in the list could be different
706   (if some nodes vote for another master).
707
708   We remove ourselves from the list since we know that (bugs aside)
709   since we use the same source for configuration information for both
710   backend and boostrap, we'll always vote for ourselves.
711
712   @type node_list: list
713   @param node_list: the list of nodes to query for master info; the current
714       node will be removed if it is in the list
715   @rtype: list
716   @return: list of (node, votes)
717
718   """
719   myself = netutils.Hostname.GetSysName()
720   try:
721     node_list.remove(myself)
722   except ValueError:
723     pass
724   if not node_list:
725     # no nodes left (eventually after removing myself)
726     return []
727   results = rpc.RpcRunner.call_master_info(node_list)
728   if not isinstance(results, dict):
729     # this should not happen (unless internal error in rpc)
730     logging.critical("Can't complete rpc call, aborting master startup")
731     return [(None, len(node_list))]
732   votes = {}
733   for node in results:
734     nres = results[node]
735     data = nres.payload
736     msg = nres.fail_msg
737     fail = False
738     if msg:
739       logging.warning("Error contacting node %s: %s", node, msg)
740       fail = True
741     # for now we accept both length 3 and 4 (data[3] is primary ip version)
742     elif not isinstance(data, (tuple, list)) or len(data) < 3:
743       logging.warning("Invalid data received from node %s: %s", node, data)
744       fail = True
745     if fail:
746       if None not in votes:
747         votes[None] = 0
748       votes[None] += 1
749       continue
750     master_node = data[2]
751     if master_node not in votes:
752       votes[master_node] = 0
753     votes[master_node] += 1
754
755   vote_list = [v for v in votes.items()]
756   # sort first on number of votes then on name, since we want None
757   # sorted later if we have the half of the nodes not responding, and
758   # half voting all for the same master
759   vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
760
761   return vote_list