Support arguments in utils.RunInSeparateProcess
[ganeti-local] / lib / bootstrap.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions to bootstrap a new cluster.
23
24 """
25
26 import os
27 import os.path
28 import re
29 import logging
30 import tempfile
31 import time
32
33 from ganeti import rpc
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import config
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import ssconf
41 from ganeti import serializer
42 from ganeti import hypervisor
43
44
45 def _InitSSHSetup():
46   """Setup the SSH configuration for the cluster.
47
48   This generates a dsa keypair for root, adds the pub key to the
49   permitted hosts and adds the hostkey to its own known hosts.
50
51   """
52   priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
53
54   for name in priv_key, pub_key:
55     if os.path.exists(name):
56       utils.CreateBackup(name)
57     utils.RemoveFile(name)
58
59   result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
60                          "-f", priv_key,
61                          "-q", "-N", ""])
62   if result.failed:
63     raise errors.OpExecError("Could not generate ssh keypair, error %s" %
64                              result.output)
65
66   utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
67
68
69 def GenerateSelfSignedSslCert(file_name, validity=(365 * 5)):
70   """Generates a self-signed SSL certificate.
71
72   @type file_name: str
73   @param file_name: Path to output file
74   @type validity: int
75   @param validity: Validity for certificate in days
76
77   """
78   (fd, tmp_file_name) = tempfile.mkstemp(dir=os.path.dirname(file_name))
79   try:
80     try:
81       # Set permissions before writing key
82       os.chmod(tmp_file_name, 0600)
83
84       result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
85                              "-days", str(validity), "-nodes", "-x509",
86                              "-keyout", tmp_file_name, "-out", tmp_file_name,
87                              "-batch"])
88       if result.failed:
89         raise errors.OpExecError("Could not generate SSL certificate, command"
90                                  " %s had exitcode %s and error message %s" %
91                                  (result.cmd, result.exit_code, result.output))
92
93       # Make read-only
94       os.chmod(tmp_file_name, 0400)
95
96       os.rename(tmp_file_name, file_name)
97     finally:
98       utils.RemoveFile(tmp_file_name)
99   finally:
100     os.close(fd)
101
102
103 def GenerateHmacKey(file_name):
104   """Writes a new HMAC key.
105
106   @type file_name: str
107   @param file_name: Path to output file
108
109   """
110   utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400)
111
112
113 def _InitGanetiServerSetup(master_name):
114   """Setup the necessary configuration for the initial node daemon.
115
116   This creates the nodepass file containing the shared password for
117   the cluster and also generates the SSL certificate.
118
119   """
120   GenerateSelfSignedSslCert(constants.SSL_CERT_FILE)
121
122   # Don't overwrite existing file
123   if not os.path.exists(constants.RAPI_CERT_FILE):
124     GenerateSelfSignedSslCert(constants.RAPI_CERT_FILE)
125
126   if not os.path.exists(constants.HMAC_CLUSTER_KEY):
127     GenerateHmacKey(constants.HMAC_CLUSTER_KEY)
128
129   result = utils.RunCmd([constants.DAEMON_UTIL, "start", constants.NODED])
130   if result.failed:
131     raise errors.OpExecError("Could not start the node daemon, command %s"
132                              " had exitcode %s and error %s" %
133                              (result.cmd, result.exit_code, result.output))
134
135   _WaitForNodeDaemon(master_name)
136
137
138 def _WaitForNodeDaemon(node_name):
139   """Wait for node daemon to become responsive.
140
141   """
142   def _CheckNodeDaemon():
143     result = rpc.RpcRunner.call_version([node_name])[node_name]
144     if result.fail_msg:
145       raise utils.RetryAgain()
146
147   try:
148     utils.Retry(_CheckNodeDaemon, 1.0, 10.0)
149   except utils.RetryTimeout:
150     raise errors.OpExecError("Node daemon on %s didn't answer queries within"
151                              " 10 seconds" % node_name)
152
153
154 def InitCluster(cluster_name, mac_prefix,
155                 master_netdev, file_storage_dir, candidate_pool_size,
156                 secondary_ip=None, vg_name=None, beparams=None,
157                 nicparams=None, hvparams=None, enabled_hypervisors=None,
158                 modify_etc_hosts=True, modify_ssh_setup=True):
159   """Initialise the cluster.
160
161   @type candidate_pool_size: int
162   @param candidate_pool_size: master candidate pool size
163
164   """
165   # TODO: complete the docstring
166   if config.ConfigWriter.IsCluster():
167     raise errors.OpPrereqError("Cluster is already initialised",
168                                errors.ECODE_STATE)
169
170   if not enabled_hypervisors:
171     raise errors.OpPrereqError("Enabled hypervisors list must contain at"
172                                " least one member", errors.ECODE_INVAL)
173   invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
174   if invalid_hvs:
175     raise errors.OpPrereqError("Enabled hypervisors contains invalid"
176                                " entries: %s" % invalid_hvs,
177                                errors.ECODE_INVAL)
178
179   hostname = utils.GetHostInfo()
180
181   if hostname.ip.startswith("127."):
182     raise errors.OpPrereqError("This host's IP resolves to the private"
183                                " range (%s). Please fix DNS or %s." %
184                                (hostname.ip, constants.ETC_HOSTS),
185                                errors.ECODE_ENVIRON)
186
187   if not utils.OwnIpAddress(hostname.ip):
188     raise errors.OpPrereqError("Inconsistency: this host's name resolves"
189                                " to %s,\nbut this ip address does not"
190                                " belong to this host. Aborting." %
191                                hostname.ip, errors.ECODE_ENVIRON)
192
193   clustername = utils.GetHostInfo(utils.HostInfo.NormalizeName(cluster_name))
194
195   if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
196                    timeout=5):
197     raise errors.OpPrereqError("Cluster IP already active. Aborting.",
198                                errors.ECODE_NOTUNIQUE)
199
200   if secondary_ip:
201     if not utils.IsValidIP(secondary_ip):
202       raise errors.OpPrereqError("Invalid secondary ip given",
203                                  errors.ECODE_INVAL)
204     if (secondary_ip != hostname.ip and
205         not utils.OwnIpAddress(secondary_ip)):
206       raise errors.OpPrereqError("You gave %s as secondary IP,"
207                                  " but it does not belong to this host." %
208                                  secondary_ip, errors.ECODE_ENVIRON)
209   else:
210     secondary_ip = hostname.ip
211
212   if vg_name is not None:
213     # Check if volume group is valid
214     vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
215                                           constants.MIN_VG_SIZE)
216     if vgstatus:
217       raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
218                                  " you are not using lvm" % vgstatus,
219                                  errors.ECODE_INVAL)
220
221   file_storage_dir = os.path.normpath(file_storage_dir)
222
223   if not os.path.isabs(file_storage_dir):
224     raise errors.OpPrereqError("The file storage directory you passed is"
225                                " not an absolute path.", errors.ECODE_INVAL)
226
227   if not os.path.exists(file_storage_dir):
228     try:
229       os.makedirs(file_storage_dir, 0750)
230     except OSError, err:
231       raise errors.OpPrereqError("Cannot create file storage directory"
232                                  " '%s': %s" % (file_storage_dir, err),
233                                  errors.ECODE_ENVIRON)
234
235   if not os.path.isdir(file_storage_dir):
236     raise errors.OpPrereqError("The file storage directory '%s' is not"
237                                " a directory." % file_storage_dir,
238                                errors.ECODE_ENVIRON)
239
240   if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
241     raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
242                                errors.ECODE_INVAL)
243
244   result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
245   if result.failed:
246     raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
247                                (master_netdev,
248                                 result.output.strip()), errors.ECODE_INVAL)
249
250   dirs = [(constants.RUN_GANETI_DIR, constants.RUN_DIRS_MODE)]
251   utils.EnsureDirs(dirs)
252
253   utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
254   utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
255   objects.NIC.CheckParameterSyntax(nicparams)
256
257   # hvparams is a mapping of hypervisor->hvparams dict
258   for hv_name, hv_params in hvparams.iteritems():
259     utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
260     hv_class = hypervisor.GetHypervisor(hv_name)
261     hv_class.CheckParameterSyntax(hv_params)
262
263   # set up the inter-node password and certificate
264   _InitGanetiServerSetup(hostname.name)
265
266   # set up ssh config and /etc/hosts
267   sshline = utils.ReadFile(constants.SSH_HOST_RSA_PUB)
268   sshkey = sshline.split(" ")[1]
269
270   if modify_etc_hosts:
271     utils.AddHostToEtcHosts(hostname.name)
272
273   if modify_ssh_setup:
274     _InitSSHSetup()
275
276   now = time.time()
277
278   # init of cluster config file
279   cluster_config = objects.Cluster(
280     serial_no=1,
281     rsahostkeypub=sshkey,
282     highest_used_port=(constants.FIRST_DRBD_PORT - 1),
283     mac_prefix=mac_prefix,
284     volume_group_name=vg_name,
285     tcpudp_port_pool=set(),
286     master_node=hostname.name,
287     master_ip=clustername.ip,
288     master_netdev=master_netdev,
289     cluster_name=clustername.name,
290     file_storage_dir=file_storage_dir,
291     enabled_hypervisors=enabled_hypervisors,
292     beparams={constants.PP_DEFAULT: beparams},
293     nicparams={constants.PP_DEFAULT: nicparams},
294     hvparams=hvparams,
295     candidate_pool_size=candidate_pool_size,
296     modify_etc_hosts=modify_etc_hosts,
297     modify_ssh_setup=modify_ssh_setup,
298     ctime=now,
299     mtime=now,
300     uuid=utils.NewUUID(),
301     )
302   master_node_config = objects.Node(name=hostname.name,
303                                     primary_ip=hostname.ip,
304                                     secondary_ip=secondary_ip,
305                                     serial_no=1,
306                                     master_candidate=True,
307                                     offline=False, drained=False,
308                                     )
309   InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
310   cfg = config.ConfigWriter()
311   ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
312   cfg.Update(cfg.GetClusterInfo(), logging.error)
313
314   # start the master ip
315   # TODO: Review rpc call from bootstrap
316   # TODO: Warn on failed start master
317   rpc.RpcRunner.call_node_start_master(hostname.name, True, False)
318
319
320 def InitConfig(version, cluster_config, master_node_config,
321                cfg_file=constants.CLUSTER_CONF_FILE):
322   """Create the initial cluster configuration.
323
324   It will contain the current node, which will also be the master
325   node, and no instances.
326
327   @type version: int
328   @param version: configuration version
329   @type cluster_config: L{objects.Cluster}
330   @param cluster_config: cluster configuration
331   @type master_node_config: L{objects.Node}
332   @param master_node_config: master node configuration
333   @type cfg_file: string
334   @param cfg_file: configuration file path
335
336   """
337   nodes = {
338     master_node_config.name: master_node_config,
339     }
340
341   now = time.time()
342   config_data = objects.ConfigData(version=version,
343                                    cluster=cluster_config,
344                                    nodes=nodes,
345                                    instances={},
346                                    serial_no=1,
347                                    ctime=now, mtime=now)
348   utils.WriteFile(cfg_file,
349                   data=serializer.Dump(config_data.ToDict()),
350                   mode=0600)
351
352
353 def FinalizeClusterDestroy(master):
354   """Execute the last steps of cluster destroy
355
356   This function shuts down all the daemons, completing the destroy
357   begun in cmdlib.LUDestroyOpcode.
358
359   """
360   cfg = config.ConfigWriter()
361   modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
362   result = rpc.RpcRunner.call_node_stop_master(master, True)
363   msg = result.fail_msg
364   if msg:
365     logging.warning("Could not disable the master role: %s", msg)
366   result = rpc.RpcRunner.call_node_leave_cluster(master, modify_ssh_setup)
367   msg = result.fail_msg
368   if msg:
369     logging.warning("Could not shutdown the node daemon and cleanup"
370                     " the node: %s", msg)
371
372
373 def SetupNodeDaemon(cluster_name, node, ssh_key_check):
374   """Add a node to the cluster.
375
376   This function must be called before the actual opcode, and will ssh
377   to the remote node, copy the needed files, and start ganeti-noded,
378   allowing the master to do the rest via normal rpc calls.
379
380   @param cluster_name: the cluster name
381   @param node: the name of the new node
382   @param ssh_key_check: whether to do a strict key check
383
384   """
385   sshrunner = ssh.SshRunner(cluster_name)
386
387   noded_cert = utils.ReadFile(constants.SSL_CERT_FILE)
388   rapi_cert = utils.ReadFile(constants.RAPI_CERT_FILE)
389   hmac_key = utils.ReadFile(constants.HMAC_CLUSTER_KEY)
390
391   # in the base64 pem encoding, neither '!' nor '.' are valid chars,
392   # so we use this to detect an invalid certificate; as long as the
393   # cert doesn't contain this, the here-document will be correctly
394   # parsed by the shell sequence below. HMAC keys are hexadecimal strings,
395   # so the same restrictions apply.
396   for content in (noded_cert, rapi_cert, hmac_key):
397     if re.search('^!EOF\.', content, re.MULTILINE):
398       raise errors.OpExecError("invalid SSL certificate or HMAC key")
399
400   if not noded_cert.endswith("\n"):
401     noded_cert += "\n"
402   if not rapi_cert.endswith("\n"):
403     rapi_cert += "\n"
404   if not hmac_key.endswith("\n"):
405     hmac_key += "\n"
406
407   # set up inter-node password and certificate and restarts the node daemon
408   # and then connect with ssh to set password and start ganeti-noded
409   # note that all the below variables are sanitized at this point,
410   # either by being constants or by the checks above
411   mycommand = ("umask 077 && "
412                "cat > '%s' << '!EOF.' && \n"
413                "%s!EOF.\n"
414                "cat > '%s' << '!EOF.' && \n"
415                "%s!EOF.\n"
416                "cat > '%s' << '!EOF.' && \n"
417                "%s!EOF.\n"
418                "chmod 0400 %s %s %s && "
419                "%s start %s" %
420                (constants.SSL_CERT_FILE, noded_cert,
421                 constants.RAPI_CERT_FILE, rapi_cert,
422                 constants.HMAC_CLUSTER_KEY, hmac_key,
423                 constants.SSL_CERT_FILE, constants.RAPI_CERT_FILE,
424                 constants.HMAC_CLUSTER_KEY,
425                 constants.DAEMON_UTIL, constants.NODED))
426
427   result = sshrunner.Run(node, 'root', mycommand, batch=False,
428                          ask_key=ssh_key_check,
429                          use_cluster_key=False,
430                          strict_host_check=ssh_key_check)
431   if result.failed:
432     raise errors.OpExecError("Remote command on node %s, error: %s,"
433                              " output: %s" %
434                              (node, result.fail_reason, result.output))
435
436   _WaitForNodeDaemon(node)
437
438
439 def MasterFailover(no_voting=False):
440   """Failover the master node.
441
442   This checks that we are not already the master, and will cause the
443   current master to cease being master, and the non-master to become
444   new master.
445
446   @type no_voting: boolean
447   @param no_voting: force the operation without remote nodes agreement
448                       (dangerous)
449
450   """
451   sstore = ssconf.SimpleStore()
452
453   old_master, new_master = ssconf.GetMasterAndMyself(sstore)
454   node_list = sstore.GetNodeList()
455   mc_list = sstore.GetMasterCandidates()
456
457   if old_master == new_master:
458     raise errors.OpPrereqError("This commands must be run on the node"
459                                " where you want the new master to be."
460                                " %s is already the master" %
461                                old_master, errors.ECODE_INVAL)
462
463   if new_master not in mc_list:
464     mc_no_master = [name for name in mc_list if name != old_master]
465     raise errors.OpPrereqError("This node is not among the nodes marked"
466                                " as master candidates. Only these nodes"
467                                " can become masters. Current list of"
468                                " master candidates is:\n"
469                                "%s" % ('\n'.join(mc_no_master)),
470                                errors.ECODE_STATE)
471
472   if not no_voting:
473     vote_list = GatherMasterVotes(node_list)
474
475     if vote_list:
476       voted_master = vote_list[0][0]
477       if voted_master is None:
478         raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
479                                    " not respond.", errors.ECODE_ENVIRON)
480       elif voted_master != old_master:
481         raise errors.OpPrereqError("I have a wrong configuration, I believe"
482                                    " the master is %s but the other nodes"
483                                    " voted %s. Please resync the configuration"
484                                    " of this node." %
485                                    (old_master, voted_master),
486                                    errors.ECODE_STATE)
487   # end checks
488
489   rcode = 0
490
491   logging.info("Setting master to %s, old master: %s", new_master, old_master)
492
493   result = rpc.RpcRunner.call_node_stop_master(old_master, True)
494   msg = result.fail_msg
495   if msg:
496     logging.error("Could not disable the master role on the old master"
497                  " %s, please disable manually: %s", old_master, msg)
498
499   # Here we have a phase where no master should be running
500
501   # instantiate a real config writer, as we now know we have the
502   # configuration data
503   cfg = config.ConfigWriter()
504
505   cluster_info = cfg.GetClusterInfo()
506   cluster_info.master_node = new_master
507   # this will also regenerate the ssconf files, since we updated the
508   # cluster info
509   cfg.Update(cluster_info, logging.error)
510
511   result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
512   msg = result.fail_msg
513   if msg:
514     logging.error("Could not start the master role on the new master"
515                   " %s, please check: %s", new_master, msg)
516     rcode = 1
517
518   return rcode
519
520
521 def GetMaster():
522   """Returns the current master node.
523
524   This is a separate function in bootstrap since it's needed by
525   gnt-cluster, and instead of importing directly ssconf, it's better
526   to abstract it in bootstrap, where we do use ssconf in other
527   functions too.
528
529   """
530   sstore = ssconf.SimpleStore()
531
532   old_master, _ = ssconf.GetMasterAndMyself(sstore)
533
534   return old_master
535
536
537 def GatherMasterVotes(node_list):
538   """Check the agreement on who is the master.
539
540   This function will return a list of (node, number of votes), ordered
541   by the number of votes. Errors will be denoted by the key 'None'.
542
543   Note that the sum of votes is the number of nodes this machine
544   knows, whereas the number of entries in the list could be different
545   (if some nodes vote for another master).
546
547   We remove ourselves from the list since we know that (bugs aside)
548   since we use the same source for configuration information for both
549   backend and boostrap, we'll always vote for ourselves.
550
551   @type node_list: list
552   @param node_list: the list of nodes to query for master info; the current
553       node will be removed if it is in the list
554   @rtype: list
555   @return: list of (node, votes)
556
557   """
558   myself = utils.HostInfo().name
559   try:
560     node_list.remove(myself)
561   except ValueError:
562     pass
563   if not node_list:
564     # no nodes left (eventually after removing myself)
565     return []
566   results = rpc.RpcRunner.call_master_info(node_list)
567   if not isinstance(results, dict):
568     # this should not happen (unless internal error in rpc)
569     logging.critical("Can't complete rpc call, aborting master startup")
570     return [(None, len(node_list))]
571   votes = {}
572   for node in results:
573     nres = results[node]
574     data = nres.payload
575     msg = nres.fail_msg
576     fail = False
577     if msg:
578       logging.warning("Error contacting node %s: %s", node, msg)
579       fail = True
580     elif not isinstance(data, (tuple, list)) or len(data) < 3:
581       logging.warning("Invalid data received from node %s: %s", node, data)
582       fail = True
583     if fail:
584       if None not in votes:
585         votes[None] = 0
586       votes[None] += 1
587       continue
588     master_node = data[2]
589     if master_node not in votes:
590       votes[master_node] = 0
591     votes[master_node] += 1
592
593   vote_list = [v for v in votes.items()]
594   # sort first on number of votes then on name, since we want None
595   # sorted later if we have the half of the nodes not responding, and
596   # half voting all for the same master
597   vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
598
599   return vote_list