Add --force-join option to gnt-node add
[ganeti-local] / lib / bootstrap.py
index 44d8f3d..d18e68b 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2006, 2007, 2008 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -40,6 +40,16 @@ from ganeti import ssconf
 from ganeti import serializer
 from ganeti import hypervisor
 from ganeti import bdev
 from ganeti import serializer
 from ganeti import hypervisor
 from ganeti import bdev
+from ganeti import netutils
+from ganeti import backend
+from ganeti import luxi
+
+
+# ec_id for InitConfig's temporary reservation manager
+_INITCONF_ECID = "initconfig-ecid"
+
+#: After how many seconds daemon must be responsive
+_DAEMON_READY_TIMEOUT = 10.0
 
 
 def _InitSSHSetup():
 
 
 def _InitSSHSetup():
@@ -148,7 +158,10 @@ def _InitGanetiServerSetup(master_name):
   """Setup the necessary configuration for the initial node daemon.
 
   This creates the nodepass file containing the shared password for
   """Setup the necessary configuration for the initial node daemon.
 
   This creates the nodepass file containing the shared password for
-  the cluster and also generates the SSL certificate.
+  the cluster, generates the SSL certificate and starts the node daemon.
+
+  @type master_name: str
+  @param master_name: Name of the master node
 
   """
   # Generate cluster secrets
 
   """
   # Generate cluster secrets
@@ -173,10 +186,30 @@ def _WaitForNodeDaemon(node_name):
       raise utils.RetryAgain()
 
   try:
       raise utils.RetryAgain()
 
   try:
-    utils.Retry(_CheckNodeDaemon, 1.0, 10.0)
+    utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
   except utils.RetryTimeout:
     raise errors.OpExecError("Node daemon on %s didn't answer queries within"
   except utils.RetryTimeout:
     raise errors.OpExecError("Node daemon on %s didn't answer queries within"
-                             " 10 seconds" % node_name)
+                             " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
+
+
+def _WaitForMasterDaemon():
+  """Wait for master daemon to become responsive.
+
+  """
+  def _CheckMasterDaemon():
+    try:
+      cl = luxi.Client()
+      (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
+    except Exception:
+      raise utils.RetryAgain()
+
+    logging.debug("Received cluster name %s from master", cluster_name)
+
+  try:
+    utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
+  except utils.RetryTimeout:
+    raise errors.OpExecError("Master daemon didn't answer queries within"
+                             " %s seconds" % _DAEMON_READY_TIMEOUT)
 
 
 def _InitFileStorage(file_storage_dir):
 
 
 def _InitFileStorage(file_storage_dir):
@@ -211,14 +244,14 @@ def _InitFileStorage(file_storage_dir):
   return file_storage_dir
 
 
   return file_storage_dir
 
 
-#pylint: disable-msg=R0913
-def InitCluster(cluster_name, mac_prefix,
+def InitCluster(cluster_name, mac_prefix, # pylint: disable-msg=R0913
                 master_netdev, file_storage_dir, candidate_pool_size,
                 secondary_ip=None, vg_name=None, beparams=None,
                 master_netdev, file_storage_dir, candidate_pool_size,
                 secondary_ip=None, vg_name=None, beparams=None,
-                nicparams=None, hvparams=None, enabled_hypervisors=None,
-                modify_etc_hosts=True, modify_ssh_setup=True,
-                maintain_node_health=False, drbd_helper=None,
-                uid_pool=None):
+                nicparams=None, ndparams=None, hvparams=None,
+                enabled_hypervisors=None, modify_etc_hosts=True,
+                modify_ssh_setup=True, maintain_node_health=False,
+                drbd_helper=None, uid_pool=None, default_iallocator=None,
+                primary_ip_version=None, prealloc_wipe_disks=False):
   """Initialise the cluster.
 
   @type candidate_pool_size: int
   """Initialise the cluster.
 
   @type candidate_pool_size: int
@@ -239,39 +272,56 @@ def InitCluster(cluster_name, mac_prefix,
                                " entries: %s" % invalid_hvs,
                                errors.ECODE_INVAL)
 
                                " entries: %s" % invalid_hvs,
                                errors.ECODE_INVAL)
 
-  hostname = utils.GetHostInfo()
 
 
-  if hostname.ip.startswith("127."):
-    raise errors.OpPrereqError("This host's IP resolves to the private"
-                               " range (%s). Please fix DNS or %s." %
+  ipcls = None
+  if primary_ip_version == constants.IP4_VERSION:
+    ipcls = netutils.IP4Address
+  elif primary_ip_version == constants.IP6_VERSION:
+    ipcls = netutils.IP6Address
+  else:
+    raise errors.OpPrereqError("Invalid primary ip version: %d." %
+                               primary_ip_version)
+
+  hostname = netutils.GetHostname(family=ipcls.family)
+  if not ipcls.IsValid(hostname.ip):
+    raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
+                               " address." % (hostname.ip, primary_ip_version))
+
+  if ipcls.IsLoopback(hostname.ip):
+    raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
+                               " address. Please fix DNS or %s." %
                                (hostname.ip, constants.ETC_HOSTS),
                                errors.ECODE_ENVIRON)
 
                                (hostname.ip, constants.ETC_HOSTS),
                                errors.ECODE_ENVIRON)
 
-  if not utils.OwnIpAddress(hostname.ip):
+  if not ipcls.Own(hostname.ip):
     raise errors.OpPrereqError("Inconsistency: this host's name resolves"
                                " to %s,\nbut this ip address does not"
     raise errors.OpPrereqError("Inconsistency: this host's name resolves"
                                " to %s,\nbut this ip address does not"
-                               " belong to this host. Aborting." %
+                               " belong to this host" %
                                hostname.ip, errors.ECODE_ENVIRON)
 
                                hostname.ip, errors.ECODE_ENVIRON)
 
-  clustername = utils.GetHostInfo(utils.HostInfo.NormalizeName(cluster_name))
+  clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
 
 
-  if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
-                   timeout=5):
-    raise errors.OpPrereqError("Cluster IP already active. Aborting.",
+  if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
+    raise errors.OpPrereqError("Cluster IP already active",
                                errors.ECODE_NOTUNIQUE)
 
                                errors.ECODE_NOTUNIQUE)
 
-  if secondary_ip:
-    if not utils.IsValidIP4(secondary_ip):
-      raise errors.OpPrereqError("Invalid secondary ip given",
+  if not secondary_ip:
+    if primary_ip_version == constants.IP6_VERSION:
+      raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
+                                 " IPv4 address must be given as secondary",
                                  errors.ECODE_INVAL)
                                  errors.ECODE_INVAL)
-    if (secondary_ip != hostname.ip and
-        not utils.OwnIpAddress(secondary_ip)):
-      raise errors.OpPrereqError("You gave %s as secondary IP,"
-                                 " but it does not belong to this host." %
-                                 secondary_ip, errors.ECODE_ENVIRON)
-  else:
     secondary_ip = hostname.ip
 
     secondary_ip = hostname.ip
 
+  if not netutils.IP4Address.IsValid(secondary_ip):
+    raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
+                               " IPv4 address." % secondary_ip,
+                               errors.ECODE_INVAL)
+
+  if not netutils.IP4Address.Own(secondary_ip):
+    raise errors.OpPrereqError("You gave %s as secondary IP,"
+                               " but it does not belong to this host." %
+                               secondary_ip, errors.ECODE_ENVIRON)
+
   if vg_name is not None:
     # Check if volume group is valid
     vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
   if vg_name is not None:
     # Check if volume group is valid
     vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
@@ -314,25 +364,36 @@ def InitCluster(cluster_name, mac_prefix,
   utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
   objects.NIC.CheckParameterSyntax(nicparams)
 
   utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
   objects.NIC.CheckParameterSyntax(nicparams)
 
+  if ndparams is not None:
+    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
+  else:
+    ndparams = dict(constants.NDC_DEFAULTS)
+
   # hvparams is a mapping of hypervisor->hvparams dict
   for hv_name, hv_params in hvparams.iteritems():
     utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
     hv_class = hypervisor.GetHypervisor(hv_name)
     hv_class.CheckParameterSyntax(hv_params)
 
   # hvparams is a mapping of hypervisor->hvparams dict
   for hv_name, hv_params in hvparams.iteritems():
     utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
     hv_class = hypervisor.GetHypervisor(hv_name)
     hv_class.CheckParameterSyntax(hv_params)
 
-  # set up the inter-node password and certificate
-  _InitGanetiServerSetup(hostname.name)
-
   # set up ssh config and /etc/hosts
   sshline = utils.ReadFile(constants.SSH_HOST_RSA_PUB)
   sshkey = sshline.split(" ")[1]
 
   if modify_etc_hosts:
   # set up ssh config and /etc/hosts
   sshline = utils.ReadFile(constants.SSH_HOST_RSA_PUB)
   sshkey = sshline.split(" ")[1]
 
   if modify_etc_hosts:
-    utils.AddHostToEtcHosts(hostname.name)
+    utils.AddHostToEtcHosts(hostname.name, hostname.ip)
 
   if modify_ssh_setup:
     _InitSSHSetup()
 
 
   if modify_ssh_setup:
     _InitSSHSetup()
 
+  if default_iallocator is not None:
+    alloc_script = utils.FindFile(default_iallocator,
+                                  constants.IALLOCATOR_SEARCH_PATH,
+                                  os.path.isfile)
+    if alloc_script is None:
+      raise errors.OpPrereqError("Invalid default iallocator script '%s'"
+                                 " specified" % default_iallocator,
+                                 errors.ECODE_INVAL)
+
   now = time.time()
 
   # init of cluster config file
   now = time.time()
 
   # init of cluster config file
@@ -351,6 +412,7 @@ def InitCluster(cluster_name, mac_prefix,
     enabled_hypervisors=enabled_hypervisors,
     beparams={constants.PP_DEFAULT: beparams},
     nicparams={constants.PP_DEFAULT: nicparams},
     enabled_hypervisors=enabled_hypervisors,
     beparams={constants.PP_DEFAULT: beparams},
     nicparams={constants.PP_DEFAULT: nicparams},
+    ndparams=ndparams,
     hvparams=hvparams,
     candidate_pool_size=candidate_pool_size,
     modify_etc_hosts=modify_etc_hosts,
     hvparams=hvparams,
     candidate_pool_size=candidate_pool_size,
     modify_etc_hosts=modify_etc_hosts,
@@ -358,9 +420,11 @@ def InitCluster(cluster_name, mac_prefix,
     uid_pool=uid_pool,
     ctime=now,
     mtime=now,
     uid_pool=uid_pool,
     ctime=now,
     mtime=now,
-    uuid=utils.NewUUID(),
     maintain_node_health=maintain_node_health,
     drbd_usermode_helper=drbd_helper,
     maintain_node_health=maintain_node_health,
     drbd_usermode_helper=drbd_helper,
+    default_iallocator=default_iallocator,
+    primary_ip_family=ipcls.family,
+    prealloc_wipe_disks=prealloc_wipe_disks,
     )
   master_node_config = objects.Node(name=hostname.name,
                                     primary_ip=hostname.ip,
     )
   master_node_config = objects.Node(name=hostname.name,
                                     primary_ip=hostname.ip,
@@ -368,16 +432,25 @@ def InitCluster(cluster_name, mac_prefix,
                                     serial_no=1,
                                     master_candidate=True,
                                     offline=False, drained=False,
                                     serial_no=1,
                                     master_candidate=True,
                                     offline=False, drained=False,
+                                    ctime=now, mtime=now,
                                     )
   InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
                                     )
   InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
-  cfg = config.ConfigWriter()
+  cfg = config.ConfigWriter(offline=True)
   ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
   cfg.Update(cfg.GetClusterInfo(), logging.error)
   ssh.WriteKnownHostsFile(cfg, constants.SSH_KNOWN_HOSTS_FILE)
   cfg.Update(cfg.GetClusterInfo(), logging.error)
+  backend.WriteSsconfFiles(cfg.GetSsconfValues())
+
+  # set up the inter-node password and certificate
+  _InitGanetiServerSetup(hostname.name)
+
+  logging.debug("Starting daemons")
+  result = utils.RunCmd([constants.DAEMON_UTIL, "start-all"])
+  if result.failed:
+    raise errors.OpExecError("Could not start daemons, command %s"
+                             " had exitcode %s and error %s" %
+                             (result.cmd, result.exit_code, result.output))
 
 
-  # start the master ip
-  # TODO: Review rpc call from bootstrap
-  # TODO: Warn on failed start master
-  rpc.RpcRunner.call_node_start_master(hostname.name, True, False)
+  _WaitForMasterDaemon()
 
 
 def InitConfig(version, cluster_config, master_node_config,
 
 
 def InitConfig(version, cluster_config, master_node_config,
@@ -397,13 +470,26 @@ def InitConfig(version, cluster_config, master_node_config,
   @param cfg_file: configuration file path
 
   """
   @param cfg_file: configuration file path
 
   """
+  uuid_generator = config.TemporaryReservationManager()
+  cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
+                                                _INITCONF_ECID)
+  master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
+                                                    _INITCONF_ECID)
   nodes = {
     master_node_config.name: master_node_config,
     }
   nodes = {
     master_node_config.name: master_node_config,
     }
-
+  default_nodegroup = objects.NodeGroup(
+    uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
+    name=constants.INITIAL_NODE_GROUP_NAME,
+    members=[master_node_config.name],
+    )
+  nodegroups = {
+    default_nodegroup.uuid: default_nodegroup,
+    }
   now = time.time()
   config_data = objects.ConfigData(version=version,
                                    cluster=cluster_config,
   now = time.time()
   config_data = objects.ConfigData(version=version,
                                    cluster=cluster_config,
+                                   nodegroups=nodegroups,
                                    nodes=nodes,
                                    instances={},
                                    serial_no=1,
                                    nodes=nodes,
                                    instances={},
                                    serial_no=1,
@@ -445,52 +531,28 @@ def SetupNodeDaemon(cluster_name, node, ssh_key_check):
   @param ssh_key_check: whether to do a strict key check
 
   """
   @param ssh_key_check: whether to do a strict key check
 
   """
-  sshrunner = ssh.SshRunner(cluster_name)
-
-  noded_cert = utils.ReadFile(constants.NODED_CERT_FILE)
-  rapi_cert = utils.ReadFile(constants.RAPI_CERT_FILE)
-  confd_hmac_key = utils.ReadFile(constants.CONFD_HMAC_KEY)
-
-  # in the base64 pem encoding, neither '!' nor '.' are valid chars,
-  # so we use this to detect an invalid certificate; as long as the
-  # cert doesn't contain this, the here-document will be correctly
-  # parsed by the shell sequence below. HMAC keys are hexadecimal strings,
-  # so the same restrictions apply.
-  for content in (noded_cert, rapi_cert, confd_hmac_key):
-    if re.search('^!EOF\.', content, re.MULTILINE):
-      raise errors.OpExecError("invalid SSL certificate or HMAC key")
-
-  if not noded_cert.endswith("\n"):
-    noded_cert += "\n"
-  if not rapi_cert.endswith("\n"):
-    rapi_cert += "\n"
-  if not confd_hmac_key.endswith("\n"):
-    confd_hmac_key += "\n"
+  family = ssconf.SimpleStore().GetPrimaryIPFamily()
+  sshrunner = ssh.SshRunner(cluster_name,
+                            ipv6=(family == netutils.IP6Address.family))
+
+  bind_address = constants.IP4_ADDRESS_ANY
+  if family == netutils.IP6Address.family:
+    bind_address = constants.IP6_ADDRESS_ANY
 
   # set up inter-node password and certificate and restarts the node daemon
   # and then connect with ssh to set password and start ganeti-noded
   # note that all the below variables are sanitized at this point,
   # either by being constants or by the checks above
 
   # set up inter-node password and certificate and restarts the node daemon
   # and then connect with ssh to set password and start ganeti-noded
   # note that all the below variables are sanitized at this point,
   # either by being constants or by the checks above
-  # TODO: Could this command exceed a shell's maximum command length?
-  mycommand = ("umask 077 && "
-               "cat > '%s' << '!EOF.' && \n"
-               "%s!EOF.\n"
-               "cat > '%s' << '!EOF.' && \n"
-               "%s!EOF.\n"
-               "cat > '%s' << '!EOF.' && \n"
-               "%s!EOF.\n"
-               "chmod 0400 %s %s %s && "
-               "%s start %s" %
-               (constants.NODED_CERT_FILE, noded_cert,
-                constants.RAPI_CERT_FILE, rapi_cert,
-                constants.CONFD_HMAC_KEY, confd_hmac_key,
-                constants.NODED_CERT_FILE, constants.RAPI_CERT_FILE,
-                constants.CONFD_HMAC_KEY,
-                constants.DAEMON_UTIL, constants.NODED))
+  sshrunner.CopyFileToNode(node, constants.NODED_CERT_FILE)
+  sshrunner.CopyFileToNode(node, constants.RAPI_CERT_FILE)
+  sshrunner.CopyFileToNode(node, constants.CONFD_HMAC_KEY)
+  mycommand = ("%s stop-all; %s start %s -b %s" %
+               (constants.DAEMON_UTIL, constants.DAEMON_UTIL, constants.NODED,
+                utils.ShellQuote(bind_address)))
 
   result = sshrunner.Run(node, 'root', mycommand, batch=False,
                          ask_key=ssh_key_check,
 
   result = sshrunner.Run(node, 'root', mycommand, batch=False,
                          ask_key=ssh_key_check,
-                         use_cluster_key=False,
+                         use_cluster_key=True,
                          strict_host_check=ssh_key_check)
   if result.failed:
     raise errors.OpExecError("Remote command on node %s, error: %s,"
                          strict_host_check=ssh_key_check)
   if result.failed:
     raise errors.OpExecError("Remote command on node %s, error: %s,"
@@ -554,17 +616,41 @@ def MasterFailover(no_voting=False):
 
   logging.info("Setting master to %s, old master: %s", new_master, old_master)
 
 
   logging.info("Setting master to %s, old master: %s", new_master, old_master)
 
+  try:
+    # instantiate a real config writer, as we now know we have the
+    # configuration data
+    cfg = config.ConfigWriter(accept_foreign=True)
+
+    cluster_info = cfg.GetClusterInfo()
+    cluster_info.master_node = new_master
+    # this will also regenerate the ssconf files, since we updated the
+    # cluster info
+    cfg.Update(cluster_info, logging.error)
+  except errors.ConfigurationError, err:
+    logging.error("Error while trying to set the new master: %s",
+                  str(err))
+    return 1
+
+  # if cfg.Update worked, then it means the old master daemon won't be
+  # able now to write its own config file (we rely on locking in both
+  # backend.UploadFile() and ConfigWriter._Write(); hence the next
+  # step is to kill the old master
+
+  logging.info("Stopping the master daemon on node %s", old_master)
+
   result = rpc.RpcRunner.call_node_stop_master(old_master, True)
   msg = result.fail_msg
   if msg:
     logging.error("Could not disable the master role on the old master"
                  " %s, please disable manually: %s", old_master, msg)
 
   result = rpc.RpcRunner.call_node_stop_master(old_master, True)
   msg = result.fail_msg
   if msg:
     logging.error("Could not disable the master role on the old master"
                  " %s, please disable manually: %s", old_master, msg)
 
+  logging.info("Checking master IP non-reachability...")
+
   master_ip = sstore.GetMasterIP()
   total_timeout = 30
   # Here we have a phase where no master should be running
   def _check_ip():
   master_ip = sstore.GetMasterIP()
   total_timeout = 30
   # Here we have a phase where no master should be running
   def _check_ip():
-    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
+    if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
       raise utils.RetryAgain()
 
   try:
       raise utils.RetryAgain()
 
   try:
@@ -574,15 +660,7 @@ def MasterFailover(no_voting=False):
                     " continuing but activating the master on the current"
                     " node will probably fail", total_timeout)
 
                     " continuing but activating the master on the current"
                     " node will probably fail", total_timeout)
 
-  # instantiate a real config writer, as we now know we have the
-  # configuration data
-  cfg = config.ConfigWriter()
-
-  cluster_info = cfg.GetClusterInfo()
-  cluster_info.master_node = new_master
-  # this will also regenerate the ssconf files, since we updated the
-  # cluster info
-  cfg.Update(cluster_info, logging.error)
+  logging.info("Starting the master daemons on the new master")
 
   result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
   msg = result.fail_msg
 
   result = rpc.RpcRunner.call_node_start_master(new_master, True, no_voting)
   msg = result.fail_msg
@@ -591,6 +669,7 @@ def MasterFailover(no_voting=False):
                   " %s, please check: %s", new_master, msg)
     rcode = 1
 
                   " %s, please check: %s", new_master, msg)
     rcode = 1
 
+  logging.info("Master failed over from %s to %s", old_master, new_master)
   return rcode
 
 
   return rcode
 
 
@@ -631,7 +710,7 @@ def GatherMasterVotes(node_list):
   @return: list of (node, votes)
 
   """
   @return: list of (node, votes)
 
   """
-  myself = utils.HostInfo().name
+  myself = netutils.Hostname.GetSysName()
   try:
     node_list.remove(myself)
   except ValueError:
   try:
     node_list.remove(myself)
   except ValueError:
@@ -653,6 +732,7 @@ def GatherMasterVotes(node_list):
     if msg:
       logging.warning("Error contacting node %s: %s", node, msg)
       fail = True
     if msg:
       logging.warning("Error contacting node %s: %s", node, msg)
       fail = True
+    # for now we accept both length 3 and 4 (data[3] is primary ip version)
     elif not isinstance(data, (tuple, list)) or len(data) < 3:
       logging.warning("Invalid data received from node %s: %s", node, data)
       fail = True
     elif not isinstance(data, (tuple, list)) or len(data) < 3:
       logging.warning("Invalid data received from node %s: %s", node, data)
       fail = True