from ganeti import objects
from ganeti import serializer
from ganeti import ssconf
+from ganeti import uidpool
+from ganeti import compat
+from ganeti import masterd
+
+import ganeti.masterd.instance # pylint: disable-msg=W0611
class LogicalUnit(object):
_CheckOSVariant(result.payload, os_name)
+def _RequireFileStorage():
+ """Checks that file storage is enabled.
+
+ @raise errors.OpPrereqError: when file storage is disabled
+
+ """
+ if not constants.ENABLE_FILE_STORAGE:
+ raise errors.OpPrereqError("File storage disabled at configure time",
+ errors.ECODE_INVAL)
+
+
def _CheckDiskTemplate(template):
"""Ensure a given disk template is valid.
msg = ("Invalid disk template name '%s', valid templates are: %s" %
(template, utils.CommaJoin(constants.DISK_TEMPLATES)))
raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+ if template == constants.DT_FILE:
+ _RequireFileStorage()
+
+
+def _CheckStorageType(storage_type):
+ """Ensure a given storage type is valid.
+
+ """
+ if storage_type not in constants.VALID_STORAGE_TYPES:
+ raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
+ errors.ECODE_INVAL)
+ if storage_type == constants.ST_FILE:
+ _RequireFileStorage()
+
+
+def _GetClusterDomainSecret():
+ """Reads the cluster domain secret.
+
+ """
+ return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
+ strict=True)
def _CheckInstanceDown(lu, instance, reason):
return faulty
-def _FormatTimestamp(secs):
- """Formats a Unix timestamp with the local timezone.
-
- """
- return time.strftime("%F %T %Z", time.gmtime(secs))
-
-
class LUPostInitCluster(LogicalUnit):
"""Logical unit for running hooks after cluster initialization.
return master
-def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
- warn_days=constants.SSL_CERT_EXPIRATION_WARN,
- error_days=constants.SSL_CERT_EXPIRATION_ERROR):
- """Verifies certificate details for LUVerifyCluster.
-
- """
- if expired:
- msg = "Certificate %s is expired" % filename
-
- if not_before is not None and not_after is not None:
- msg += (" (valid from %s to %s)" %
- (_FormatTimestamp(not_before),
- _FormatTimestamp(not_after)))
- elif not_before is not None:
- msg += " (valid from %s)" % _FormatTimestamp(not_before)
- elif not_after is not None:
- msg += " (valid until %s)" % _FormatTimestamp(not_after)
-
- return (LUVerifyCluster.ETYPE_ERROR, msg)
-
- elif not_before is not None and not_before > now:
- return (LUVerifyCluster.ETYPE_WARNING,
- "Certificate %s not yet valid (valid from %s)" %
- (filename, _FormatTimestamp(not_before)))
-
- elif not_after is not None:
- remaining_days = int((not_after - now) / (24 * 3600))
-
- msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
-
- if remaining_days <= error_days:
- return (LUVerifyCluster.ETYPE_ERROR, msg)
-
- if remaining_days <= warn_days:
- return (LUVerifyCluster.ETYPE_WARNING, msg)
-
- return (None, None)
-
-
def _VerifyCertificate(filename):
"""Verifies a certificate for LUVerifyCluster.
return (LUVerifyCluster.ETYPE_ERROR,
"Failed to load X509 certificate %s: %s" % (filename, err))
- # Depending on the pyOpenSSL version, this can just return (None, None)
- (not_before, not_after) = utils.GetX509CertValidity(cert)
+ (errcode, msg) = \
+ utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
+ constants.SSL_CERT_EXPIRATION_ERROR)
+
+ if msg:
+ fnamemsg = "While verifying %s: %s" % (filename, msg)
+ else:
+ fnamemsg = None
+
+ if errcode is None:
+ return (None, fnamemsg)
+ elif errcode == utils.CERT_WARNING:
+ return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
+ elif errcode == utils.CERT_ERROR:
+ return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
- return _VerifyCertificateInner(filename, cert.has_expired(),
- not_before, not_after, time.time())
+ raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
class LUVerifyCluster(LogicalUnit):
"tcp communication with node '%s': %s",
anode, nresult[constants.NV_NODENETTEST][anode])
+ test = constants.NV_MASTERIP not in nresult
+ _ErrorIf(test, self.ENODENET, node,
+ "node hasn't returned node master IP reachability data")
+ if not test:
+ if not nresult[constants.NV_MASTERIP]:
+ if node == self.master_node:
+ msg = "the master node cannot reach the master IP (not configured?)"
+ else:
+ msg = "cannot reach the master IP"
+ _ErrorIf(True, self.ENODENET, node, msg)
+
+
def _VerifyInstance(self, instance, instanceconfig, node_image):
"""Verify an instance.
vg_name = self.cfg.GetVGName()
hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
+ cluster = self.cfg.GetClusterInfo()
nodelist = utils.NiceSort(self.cfg.GetNodeList())
nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
instancelist = utils.NiceSort(self.cfg.GetInstanceList())
# FIXME: verify OS list
# do local checksums
master_files = [constants.CLUSTER_CONF_FILE]
+ master_node = self.master_node = self.cfg.GetMasterNode()
+ master_ip = self.cfg.GetMasterIP()
file_names = ssconf.SimpleStore().GetFileList()
file_names.extend(constants.ALL_CERT_FILES)
file_names.extend(master_files)
+ if cluster.modify_etc_hosts:
+ file_names.append(constants.ETC_HOSTS)
local_checksums = utils.FingerprintFiles(file_names)
constants.NV_HVINFO: self.cfg.GetHypervisorType(),
constants.NV_NODESETUP: None,
constants.NV_TIME: None,
+ constants.NV_MASTERIP: (master_node, master_ip),
}
if vg_name is not None:
self.cfg.GetClusterName())
nvinfo_endtime = time.time()
- cluster = self.cfg.GetClusterInfo()
- master_node = self.cfg.GetMasterNode()
all_drbd_map = self.cfg.ComputeDRBDMap()
feedback_fn("* Verifying node status")
feedback_fn("* Verifying orphan volumes")
self._VerifyOrphanVolumes(node_vol_should, node_image)
- feedback_fn("* Verifying oprhan instances")
+ feedback_fn("* Verifying orphan instances")
self._VerifyOrphanInstances(instancelist, node_image)
if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
"""Check parameters
"""
- if not hasattr(self.op, "candidate_pool_size"):
- self.op.candidate_pool_size = None
+ for attr in ["candidate_pool_size",
+ "uid_pool", "add_uids", "remove_uids"]:
+ if not hasattr(self.op, attr):
+ setattr(self.op, attr, None)
+
if self.op.candidate_pool_size is not None:
try:
self.op.candidate_pool_size = int(self.op.candidate_pool_size)
if self.op.candidate_pool_size < 1:
raise errors.OpPrereqError("At least one master candidate needed",
errors.ECODE_INVAL)
+
_CheckBooleanOpField(self.op, "maintain_node_health")
+ if self.op.uid_pool:
+ uidpool.CheckUidPool(self.op.uid_pool)
+
+ if self.op.add_uids:
+ uidpool.CheckUidPool(self.op.add_uids)
+
+ if self.op.remove_uids:
+ uidpool.CheckUidPool(self.op.remove_uids)
+
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
# all nodes to be modified.
"\n".join(nic_errors))
# hypervisor list/parameters
- self.new_hvparams = objects.FillDict(cluster.hvparams, {})
+ self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
if self.op.hvparams:
if not isinstance(self.op.hvparams, dict):
raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
else:
self.new_os_hvp[os_name][hv_name].update(hv_dict)
+ # changes to the hypervisor list
if self.op.enabled_hypervisors is not None:
self.hv_list = self.op.enabled_hypervisors
if not self.hv_list:
" entries: %s" %
utils.CommaJoin(invalid_hvs),
errors.ECODE_INVAL)
+ for hv in self.hv_list:
+ # if the hypervisor doesn't already exist in the cluster
+ # hvparams, we initialize it to empty, and then (in both
+ # cases) we make sure to fill the defaults, as we might not
+ # have a complete defaults list if the hypervisor wasn't
+ # enabled before
+ if hv not in new_hvp:
+ new_hvp[hv] = {}
+ new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
+ utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
else:
self.hv_list = cluster.enabled_hypervisors
if self.op.os_hvp:
self.cluster.os_hvp = self.new_os_hvp
if self.op.enabled_hypervisors is not None:
+ self.cluster.hvparams = self.new_hvparams
self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
if self.op.beparams:
self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
if self.op.maintain_node_health is not None:
self.cluster.maintain_node_health = self.op.maintain_node_health
+ if self.op.add_uids is not None:
+ uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
+
+ if self.op.remove_uids is not None:
+ uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
+
+ if self.op.uid_pool is not None:
+ self.cluster.uid_pool = self.op.uid_pool
+
self.cfg.Update(self.cluster, feedback_fn)
constants.RAPI_CERT_FILE,
constants.RAPI_USERS_FILE,
constants.CONFD_HMAC_KEY,
+ constants.CLUSTER_DOMAIN_SECRET_FILE,
])
enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
_RedistributeAncillaryFiles(self)
-def _WaitForSync(lu, instance, oneshot=False):
+def _WaitForSync(lu, instance, disks=None, oneshot=False):
"""Sleep and poll for an instance's disk to sync.
"""
- if not instance.disks:
+ if not instance.disks or disks is not None and not disks:
return True
+ disks = _ExpandCheckDisks(instance, disks)
+
if not oneshot:
lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
node = instance.primary_node
- for dev in instance.disks:
+ for dev in disks:
lu.cfg.SetDiskID(dev, node)
# TODO: Convert to utils.Retry
max_time = 0
done = True
cumul_degraded = False
- rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
+ rstats = lu.rpc.call_blockdev_getmirrorstatus(node, disks)
msg = rstats.fail_msg
if msg:
lu.LogWarning("Can't get any data from node %s: %s", node, msg)
for i, mstat in enumerate(rstats):
if mstat is None:
lu.LogWarning("Can't compute data for node %s/%s",
- node, instance.disks[i].iv_name)
+ node, disks[i].iv_name)
continue
cumul_degraded = (cumul_degraded or
else:
rem_time = "no time estimate"
lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
- (instance.disks[i].iv_name, mstat.sync_percent,
- rem_time))
+ (disks[i].iv_name, mstat.sync_percent, rem_time))
# if we're done but degraded, let's do a few small retries, to
# make sure we see a stable and not transient situation; therefore
self.LogWarning("Errors encountered on the remote node while leaving"
" the cluster: %s", msg)
+ # Remove node from our /etc/hosts
+ if self.cfg.GetClusterInfo().modify_etc_hosts:
+ # FIXME: this should be done via an rpc call to node daemon
+ utils.RemoveHostFromEtcHosts(node.name)
+ _RedistributeAncillaryFiles(self)
+
class LUQueryNodes(NoHooksLU):
"""Logical unit for querying nodes.
REQ_BGL = False
_FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
- def ExpandNames(self):
- storage_type = self.op.storage_type
-
- if storage_type not in constants.VALID_STORAGE_TYPES:
- raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
- errors.ECODE_INVAL)
+ def CheckArguments(self):
+ _CheckStorageType(self.op.storage_type)
_CheckOutputFields(static=self._FIELDS_STATIC,
dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
selected=self.op.output_fields)
+ def ExpandNames(self):
self.needed_locks = {}
self.share_locks[locking.LEVEL_NODE] = 1
def CheckArguments(self):
self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
- storage_type = self.op.storage_type
- if storage_type not in constants.VALID_STORAGE_TYPES:
- raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
- errors.ECODE_INVAL)
+ _CheckStorageType(self.op.storage_type)
def ExpandNames(self):
self.needed_locks = {
raise errors.OpPrereqError("Node %s is not in the configuration" % node,
errors.ECODE_NOENT)
+ self.changed_primary_ip = False
+
for existing_node_name in node_list:
existing_node = cfg.GetNodeInfo(existing_node_name)
if self.op.readd and node == existing_node_name:
- if (existing_node.primary_ip != primary_ip or
- existing_node.secondary_ip != secondary_ip):
+ if existing_node.secondary_ip != secondary_ip:
raise errors.OpPrereqError("Readded node doesn't have the same IP"
" address configuration as before",
errors.ECODE_INVAL)
+ if existing_node.primary_ip != primary_ip:
+ self.changed_primary_ip = True
+
continue
if (existing_node.primary_ip == primary_ip or
self.LogInfo("Readding a node, the offline/drained flags were reset")
# if we demote the node, we do cleanup later in the procedure
new_node.master_candidate = self.master_candidate
+ if self.changed_primary_ip:
+ new_node.primary_ip = self.op.primary_ip
# notify the user about any possible mc promotion
if new_node.master_candidate:
# Add node to our /etc/hosts, and add key to known_hosts
if self.cfg.GetClusterInfo().modify_etc_hosts:
+ # FIXME: this should be done via an rpc call to node daemon
utils.AddHostToEtcHosts(new_node.name)
if new_node.secondary_ip != new_node.primary_ip:
"mtime": cluster.mtime,
"uuid": cluster.uuid,
"tags": list(cluster.GetTags()),
+ "uid_pool": cluster.uid_pool,
}
return result
return disks_info
-def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
+def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
ignore_size=False):
"""Prepare the block devices for an instance.
@param lu: the logical unit on whose behalf we execute
@type instance: L{objects.Instance}
@param instance: the instance for whose disks we assemble
+ @type disks: list of L{objects.Disk} or None
+ @param disks: which disks to assemble (or all, if None)
@type ignore_secondaries: boolean
@param ignore_secondaries: if true, errors on secondary nodes
won't result in an error return from the function
device_info = []
disks_ok = True
iname = instance.name
+ disks = _ExpandCheckDisks(instance, disks)
+
# With the two passes mechanism we try to reduce the window of
# opportunity for the race condition of switching DRBD to primary
# before handshaking occured, but we do not eliminate it
# SyncSource, etc.)
# 1st pass, assemble on all nodes in secondary mode
- for inst_disk in instance.disks:
+ for inst_disk in disks:
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
if ignore_size:
node_disk = node_disk.Copy()
# FIXME: race condition on drbd migration to primary
# 2nd pass, do only the primary node
- for inst_disk in instance.disks:
+ for inst_disk in disks:
dev_path = None
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
# leave the disks configured for the primary node
# this is a workaround that would be fixed better by
# improving the logical/physical id handling
- for disk in instance.disks:
+ for disk in disks:
lu.cfg.SetDiskID(disk, instance.primary_node)
return disks_ok, device_info
_SafeShutdownInstanceDisks(self, instance)
-def _SafeShutdownInstanceDisks(lu, instance):
+def _SafeShutdownInstanceDisks(lu, instance, disks=None):
"""Shutdown block devices of an instance.
This function checks if an instance is running, before calling
"""
_CheckInstanceDown(lu, instance, "cannot shutdown disks")
- _ShutdownInstanceDisks(lu, instance)
+ _ShutdownInstanceDisks(lu, instance, disks=disks)
+
+def _ExpandCheckDisks(instance, disks):
+ """Return the instance disks selected by the disks list
-def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
+ @type disks: list of L{objects.Disk} or None
+ @param disks: selected disks
+ @rtype: list of L{objects.Disk}
+ @return: selected instance disks to act on
+
+ """
+ if disks is None:
+ return instance.disks
+ else:
+ if not set(disks).issubset(instance.disks):
+ raise errors.ProgrammerError("Can only act on disks belonging to the"
+ " target instance")
+ return disks
+
+
+def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
"""Shutdown block devices of an instance.
This does the shutdown on all nodes of the instance.
"""
all_result = True
- for disk in instance.disks:
+ disks = _ExpandCheckDisks(instance, disks)
+
+ for disk in disks:
for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
lu.cfg.SetDiskID(top_disk, node)
result = lu.rpc.call_blockdev_shutdown(node, top_disk)
" node %s: %s" %
(instance.name, instance.primary_node, msg))
- logging.info("Removing block devices for instance %s", instance.name)
+ _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
- if not _RemoveDisks(self, instance):
- if self.op.ignore_failures:
- feedback_fn("Warning: can't remove instance's disks")
- else:
- raise errors.OpExecError("Can't remove instance's disks")
- logging.info("Removing instance %s out of cluster config", instance.name)
+def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
+ """Utility function to remove an instance.
- self.cfg.RemoveInstance(instance.name)
- self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
+ """
+ logging.info("Removing block devices for instance %s", instance.name)
+
+ if not _RemoveDisks(lu, instance):
+ if not ignore_failures:
+ raise errors.OpExecError("Can't remove instance's disks")
+ feedback_fn("Warning: can't remove instance's disks")
+
+ logging.info("Removing instance %s out of cluster config", instance.name)
+
+ lu.cfg.RemoveInstance(instance.name)
+
+ assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
+ "Instance lock removal conflict"
+
+ # Remove lock for the instance
+ lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
class LUQueryInstances(NoHooksLU):
if len(secondary_nodes) != 0:
raise errors.ProgrammerError("Wrong template configuration")
+ _RequireFileStorage()
+
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
"""
HPATH = "instance-add"
HTYPE = constants.HTYPE_INSTANCE
- _OP_REQP = ["instance_name", "disks", "disk_template",
+ _OP_REQP = ["instance_name", "disks",
"mode", "start",
"wait_for_sync", "ip_check", "nics",
"hvparams", "beparams"]
"""
# set optional parameters to none if they don't exist
- for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
+ for attr in ["pnode", "snode", "iallocator", "hypervisor",
+ "disk_template", "identify_defaults"]:
if not hasattr(self.op, attr):
setattr(self.op, attr, None)
# TODO: make the ip check more flexible and not depend on the name check
raise errors.OpPrereqError("Cannot do ip checks without a name check",
errors.ECODE_INVAL)
- if (self.op.disk_template == constants.DT_FILE and
- not constants.ENABLE_FILE_STORAGE):
- raise errors.OpPrereqError("File storage disabled at configure time",
- errors.ECODE_INVAL)
# check disk information: either all adopt, or no adopt
has_adopt = has_no_adopt = False
for disk in self.op.disks:
else:
has_no_adopt = True
if has_adopt and has_no_adopt:
- raise errors.OpPrereqError("Either all disks have are adoped or none is",
+ raise errors.OpPrereqError("Either all disks are adopted or none is",
errors.ECODE_INVAL)
if has_adopt:
if self.op.disk_template != constants.DT_PLAIN:
self.adopt_disks = has_adopt
- def ExpandNames(self):
- """ExpandNames for CreateInstance.
-
- Figure out the right locks for instance creation.
-
- """
- self.needed_locks = {}
-
- # cheap checks, mostly valid constants given
-
# verify creation mode
- if self.op.mode not in (constants.INSTANCE_CREATE,
- constants.INSTANCE_IMPORT):
+ if self.op.mode not in constants.INSTANCE_CREATE_MODES:
raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
self.op.mode, errors.ECODE_INVAL)
- # disk template and mirror node verification
- _CheckDiskTemplate(self.op.disk_template)
-
- if self.op.hypervisor is None:
- self.op.hypervisor = self.cfg.GetHypervisorType()
-
- cluster = self.cfg.GetClusterInfo()
- enabled_hvs = cluster.enabled_hypervisors
- if self.op.hypervisor not in enabled_hvs:
- raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
- " cluster (%s)" % (self.op.hypervisor,
- ",".join(enabled_hvs)),
- errors.ECODE_STATE)
-
- # check hypervisor parameter syntax (locally)
- utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
- filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
- self.op.hvparams)
- hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
- hv_type.CheckParameterSyntax(filled_hvp)
- self.hv_full = filled_hvp
- # check that we don't specify global parameters on an instance
- _CheckGlobalHvParams(self.op.hvparams)
-
- # fill and remember the beparams dict
- utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
- self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
- self.op.beparams)
-
- #### instance parameters check
-
# instance name verification
if self.op.name_check:
- hostname1 = utils.GetHostInfo(self.op.instance_name)
- self.op.instance_name = instance_name = hostname1.name
+ self.hostname1 = utils.GetHostInfo(self.op.instance_name)
+ self.op.instance_name = self.hostname1.name
# used in CheckPrereq for ip ping check
- self.check_ip = hostname1.ip
+ self.check_ip = self.hostname1.ip
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ raise errors.OpPrereqError("Remote imports require names to be checked" %
+ errors.ECODE_INVAL)
else:
- instance_name = self.op.instance_name
self.check_ip = None
- # this is just a preventive check, but someone might still add this
- # instance in the meantime, and creation will fail at lock-add time
- if instance_name in self.cfg.GetInstanceList():
- raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
- instance_name, errors.ECODE_EXISTS)
-
- self.add_locks[locking.LEVEL_INSTANCE] = instance_name
-
- # NIC buildup
- self.nics = []
- for idx, nic in enumerate(self.op.nics):
- nic_mode_req = nic.get("mode", None)
- nic_mode = nic_mode_req
- if nic_mode is None:
- nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
-
- # in routed mode, for the first nic, the default ip is 'auto'
- if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
- default_ip_mode = constants.VALUE_AUTO
- else:
- default_ip_mode = constants.VALUE_NONE
-
- # ip validity checks
- ip = nic.get("ip", default_ip_mode)
- if ip is None or ip.lower() == constants.VALUE_NONE:
- nic_ip = None
- elif ip.lower() == constants.VALUE_AUTO:
- if not self.op.name_check:
- raise errors.OpPrereqError("IP address set to auto but name checks"
- " have been skipped. Aborting.",
- errors.ECODE_INVAL)
- nic_ip = hostname1.ip
- else:
- if not utils.IsValidIP(ip):
- raise errors.OpPrereqError("Given IP address '%s' doesn't look"
- " like a valid IP" % ip,
- errors.ECODE_INVAL)
- nic_ip = ip
-
- # TODO: check the ip address for uniqueness
- if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
- raise errors.OpPrereqError("Routed nic mode requires an ip address",
- errors.ECODE_INVAL)
-
- # MAC address verification
- mac = nic.get("mac", constants.VALUE_AUTO)
- if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
- mac = utils.NormalizeAndValidateMac(mac)
-
- try:
- self.cfg.ReserveMAC(mac, self.proc.GetECId())
- except errors.ReservationError:
- raise errors.OpPrereqError("MAC address %s already in use"
- " in cluster" % mac,
- errors.ECODE_NOTUNIQUE)
-
- # bridge verification
- bridge = nic.get("bridge", None)
- link = nic.get("link", None)
- if bridge and link:
- raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
- " at the same time", errors.ECODE_INVAL)
- elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
- raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
- errors.ECODE_INVAL)
- elif bridge:
- link = bridge
-
- nicparams = {}
- if nic_mode_req:
- nicparams[constants.NIC_MODE] = nic_mode_req
- if link:
- nicparams[constants.NIC_LINK] = link
-
- check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
- nicparams)
- objects.NIC.CheckParameterSyntax(check_params)
- self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
-
- # disk checks/pre-build
- self.disks = []
- for disk in self.op.disks:
- mode = disk.get("mode", constants.DISK_RDWR)
- if mode not in constants.DISK_ACCESS_SET:
- raise errors.OpPrereqError("Invalid disk access mode '%s'" %
- mode, errors.ECODE_INVAL)
- size = disk.get("size", None)
- if size is None:
- raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
- try:
- size = int(size)
- except (TypeError, ValueError):
- raise errors.OpPrereqError("Invalid disk size '%s'" % size,
- errors.ECODE_INVAL)
- new_disk = {"size": size, "mode": mode}
- if "adopt" in disk:
- new_disk["adopt"] = disk["adopt"]
- self.disks.append(new_disk)
-
# file storage checks
if (self.op.file_driver and
not self.op.file_driver in constants.FILE_DRIVER):
" node must be given",
errors.ECODE_INVAL)
- if self.op.iallocator:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
- else:
- self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
- nodelist = [self.op.pnode]
- if self.op.snode is not None:
- self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
- nodelist.append(self.op.snode)
- self.needed_locks[locking.LEVEL_NODE] = nodelist
+ self._cds = _GetClusterDomainSecret()
- # in case of import lock the source node too
if self.op.mode == constants.INSTANCE_IMPORT:
- src_node = getattr(self.op, "src_node", None)
- src_path = getattr(self.op, "src_path", None)
+ # On import force_variant must be True, because if we forced it at
+ # initial install, our only chance when importing it back is that it
+ # works again!
+ self.op.force_variant = True
- if src_path is None:
- self.op.src_path = src_path = self.op.instance_name
+ if self.op.no_install:
+ self.LogInfo("No-installation mode has no effect during import")
- if src_node is None:
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
- self.op.src_node = None
- if os.path.isabs(src_path):
- raise errors.OpPrereqError("Importing an instance from an absolute"
- " path requires a source node option.",
- errors.ECODE_INVAL)
- else:
- self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
- if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
- self.needed_locks[locking.LEVEL_NODE].append(src_node)
- if not os.path.isabs(src_path):
- self.op.src_path = src_path = \
- utils.PathJoin(constants.EXPORT_DIR, src_path)
-
- # On import force_variant must be True, because if we forced it at
- # initial install, our only chance when importing it back is that it
- # works again!
- self.op.force_variant = True
-
- if self.op.no_install:
- self.LogInfo("No-installation mode has no effect during import")
-
- else: # INSTANCE_CREATE
+ elif self.op.mode == constants.INSTANCE_CREATE:
if getattr(self.op, "os_type", None) is None:
raise errors.OpPrereqError("No guest OS specified",
errors.ECODE_INVAL)
self.op.force_variant = getattr(self.op, "force_variant", False)
+ if self.op.disk_template is None:
+ raise errors.OpPrereqError("No disk template specified",
+ errors.ECODE_INVAL)
+
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ # Check handshake to ensure both clusters have the same domain secret
+ src_handshake = getattr(self.op, "source_handshake", None)
+ if not src_handshake:
+ raise errors.OpPrereqError("Missing source handshake",
+ errors.ECODE_INVAL)
+
+ errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds,
+ src_handshake)
+ if errmsg:
+ raise errors.OpPrereqError("Invalid handshake: %s" % errmsg,
+ errors.ECODE_INVAL)
+
+ # Load and check source CA
+ self.source_x509_ca_pem = getattr(self.op, "source_x509_ca", None)
+ if not self.source_x509_ca_pem:
+ raise errors.OpPrereqError("Missing source X509 CA",
+ errors.ECODE_INVAL)
+
+ try:
+ (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
+ self._cds)
+ except OpenSSL.crypto.Error, err:
+ raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
+ (err, ), errors.ECODE_INVAL)
+
+ (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
+ if errcode is not None:
+ raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
+ errors.ECODE_INVAL)
+
+ self.source_x509_ca = cert
+
+ src_instance_name = getattr(self.op, "source_instance_name", None)
+ if not src_instance_name:
+ raise errors.OpPrereqError("Missing source instance name",
+ errors.ECODE_INVAL)
+
+ self.source_instance_name = \
+ utils.GetHostInfo(utils.HostInfo.NormalizeName(src_instance_name)).name
+
+ else:
+ raise errors.OpPrereqError("Invalid instance creation mode %r" %
+ self.op.mode, errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ """ExpandNames for CreateInstance.
+
+ Figure out the right locks for instance creation.
+
+ """
+ self.needed_locks = {}
+
+ instance_name = self.op.instance_name
+ # this is just a preventive check, but someone might still add this
+ # instance in the meantime, and creation will fail at lock-add time
+ if instance_name in self.cfg.GetInstanceList():
+ raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
+ instance_name, errors.ECODE_EXISTS)
+
+ self.add_locks[locking.LEVEL_INSTANCE] = instance_name
+
+ if self.op.iallocator:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ else:
+ self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
+ nodelist = [self.op.pnode]
+ if self.op.snode is not None:
+ self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
+ nodelist.append(self.op.snode)
+ self.needed_locks[locking.LEVEL_NODE] = nodelist
+
+ # in case of import lock the source node too
+ if self.op.mode == constants.INSTANCE_IMPORT:
+ src_node = getattr(self.op, "src_node", None)
+ src_path = getattr(self.op, "src_path", None)
+
+ if src_path is None:
+ self.op.src_path = src_path = self.op.instance_name
+
+ if src_node is None:
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+ self.op.src_node = None
+ if os.path.isabs(src_path):
+ raise errors.OpPrereqError("Importing an instance from an absolute"
+ " path requires a source node option.",
+ errors.ECODE_INVAL)
+ else:
+ self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
+ if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
+ self.needed_locks[locking.LEVEL_NODE].append(src_node)
+ if not os.path.isabs(src_path):
+ self.op.src_path = src_path = \
+ utils.PathJoin(constants.EXPORT_DIR, src_path)
def _RunAllocator(self):
"""Run the allocator based on input opcode.
self.secondaries)
return env, nl, nl
+ def _ReadExportInfo(self):
+ """Reads the export information from disk.
+
+ It will override the opcode source node and path with the actual
+ information, if these two were not specified before.
+
+ @return: the export information
+
+ """
+ assert self.op.mode == constants.INSTANCE_IMPORT
+
+ src_node = self.op.src_node
+ src_path = self.op.src_path
+
+ if src_node is None:
+ locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+ exp_list = self.rpc.call_export_list(locked_nodes)
+ found = False
+ for node in exp_list:
+ if exp_list[node].fail_msg:
+ continue
+ if src_path in exp_list[node].payload:
+ found = True
+ self.op.src_node = src_node = node
+ self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
+ src_path)
+ break
+ if not found:
+ raise errors.OpPrereqError("No export found for relative path %s" %
+ src_path, errors.ECODE_INVAL)
+
+ _CheckNodeOnline(self, src_node)
+ result = self.rpc.call_export_info(src_node, src_path)
+ result.Raise("No export or invalid export found in dir %s" % src_path)
+
+ export_info = objects.SerializableConfigParser.Loads(str(result.payload))
+ if not export_info.has_section(constants.INISECT_EXP):
+ raise errors.ProgrammerError("Corrupted export config",
+ errors.ECODE_ENVIRON)
+
+ ei_version = export_info.get(constants.INISECT_EXP, "version")
+ if (int(ei_version) != constants.EXPORT_VERSION):
+ raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
+ (ei_version, constants.EXPORT_VERSION),
+ errors.ECODE_ENVIRON)
+ return export_info
+
+ def _ReadExportParams(self, einfo):
+ """Use export parameters as defaults.
+
+ In case the opcode doesn't specify (as in override) some instance
+ parameters, then try to use them from the export information, if
+ that declares them.
+
+ """
+ self.op.os_type = einfo.get(constants.INISECT_EXP, "os")
+
+ if self.op.disk_template is None:
+ if einfo.has_option(constants.INISECT_INS, "disk_template"):
+ self.op.disk_template = einfo.get(constants.INISECT_INS,
+ "disk_template")
+ else:
+ raise errors.OpPrereqError("No disk template specified and the export"
+ " is missing the disk_template information",
+ errors.ECODE_INVAL)
+
+ if not self.op.disks:
+ if einfo.has_option(constants.INISECT_INS, "disk_count"):
+ disks = []
+ # TODO: import the disk iv_name too
+ for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")):
+ disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
+ disks.append({"size": disk_sz})
+ self.op.disks = disks
+ else:
+ raise errors.OpPrereqError("No disk info specified and the export"
+ " is missing the disk information",
+ errors.ECODE_INVAL)
+
+ if (not self.op.nics and
+ einfo.has_option(constants.INISECT_INS, "nic_count")):
+ nics = []
+ for idx in range(einfo.getint(constants.INISECT_INS, "nic_count")):
+ ndict = {}
+ for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
+ v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
+ ndict[name] = v
+ nics.append(ndict)
+ self.op.nics = nics
+
+ if (self.op.hypervisor is None and
+ einfo.has_option(constants.INISECT_INS, "hypervisor")):
+ self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor")
+ if einfo.has_section(constants.INISECT_HYP):
+ # use the export parameters but do not override the ones
+ # specified by the user
+ for name, value in einfo.items(constants.INISECT_HYP):
+ if name not in self.op.hvparams:
+ self.op.hvparams[name] = value
+
+ if einfo.has_section(constants.INISECT_BEP):
+ # use the parameters, without overriding
+ for name, value in einfo.items(constants.INISECT_BEP):
+ if name not in self.op.beparams:
+ self.op.beparams[name] = value
+ else:
+ # try to read the parameters old style, from the main section
+ for name in constants.BES_PARAMETERS:
+ if (name not in self.op.beparams and
+ einfo.has_option(constants.INISECT_INS, name)):
+ self.op.beparams[name] = einfo.get(constants.INISECT_INS, name)
+
+ def _RevertToDefaults(self, cluster):
+ """Revert the instance parameters to the default values.
+
+ """
+ # hvparams
+ hv_defs = cluster.GetHVDefaults(self.op.hypervisor, self.op.os_type)
+ for name in self.op.hvparams.keys():
+ if name in hv_defs and hv_defs[name] == self.op.hvparams[name]:
+ del self.op.hvparams[name]
+ # beparams
+ be_defs = cluster.beparams.get(constants.PP_DEFAULT, {})
+ for name in self.op.beparams.keys():
+ if name in be_defs and be_defs[name] == self.op.beparams[name]:
+ del self.op.beparams[name]
+ # nic params
+ nic_defs = cluster.nicparams.get(constants.PP_DEFAULT, {})
+ for nic in self.op.nics:
+ for name in constants.NICS_PARAMETERS:
+ if name in nic and name in nic_defs and nic[name] == nic_defs[name]:
+ del nic[name]
def CheckPrereq(self):
"""Check prerequisites.
"""
+ if self.op.mode == constants.INSTANCE_IMPORT:
+ export_info = self._ReadExportInfo()
+ self._ReadExportParams(export_info)
+
+ _CheckDiskTemplate(self.op.disk_template)
+
if (not self.cfg.GetVGName() and
self.op.disk_template not in constants.DTS_NOT_LVM):
raise errors.OpPrereqError("Cluster does not support lvm-based"
" instances", errors.ECODE_STATE)
- if self.op.mode == constants.INSTANCE_IMPORT:
- src_node = self.op.src_node
- src_path = self.op.src_path
+ if self.op.hypervisor is None:
+ self.op.hypervisor = self.cfg.GetHypervisorType()
- if src_node is None:
- locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
- exp_list = self.rpc.call_export_list(locked_nodes)
- found = False
- for node in exp_list:
- if exp_list[node].fail_msg:
- continue
- if src_path in exp_list[node].payload:
- found = True
- self.op.src_node = src_node = node
- self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
- src_path)
- break
- if not found:
- raise errors.OpPrereqError("No export found for relative path %s" %
- src_path, errors.ECODE_INVAL)
-
- _CheckNodeOnline(self, src_node)
- result = self.rpc.call_export_info(src_node, src_path)
- result.Raise("No export or invalid export found in dir %s" % src_path)
-
- export_info = objects.SerializableConfigParser.Loads(str(result.payload))
- if not export_info.has_section(constants.INISECT_EXP):
- raise errors.ProgrammerError("Corrupted export config",
- errors.ECODE_ENVIRON)
-
- ei_version = export_info.get(constants.INISECT_EXP, 'version')
- if (int(ei_version) != constants.EXPORT_VERSION):
- raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
- (ei_version, constants.EXPORT_VERSION),
- errors.ECODE_ENVIRON)
+ cluster = self.cfg.GetClusterInfo()
+ enabled_hvs = cluster.enabled_hypervisors
+ if self.op.hypervisor not in enabled_hvs:
+ raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
+ " cluster (%s)" % (self.op.hypervisor,
+ ",".join(enabled_hvs)),
+ errors.ECODE_STATE)
+
+ # check hypervisor parameter syntax (locally)
+ utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
+ filled_hvp = objects.FillDict(cluster.GetHVDefaults(self.op.hypervisor,
+ self.op.os_type),
+ self.op.hvparams)
+ hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
+ hv_type.CheckParameterSyntax(filled_hvp)
+ self.hv_full = filled_hvp
+ # check that we don't specify global parameters on an instance
+ _CheckGlobalHvParams(self.op.hvparams)
+
+ # fill and remember the beparams dict
+ utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
+ self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
+ self.op.beparams)
+
+ # now that hvp/bep are in final format, let's reset to defaults,
+ # if told to do so
+ if self.op.identify_defaults:
+ self._RevertToDefaults(cluster)
+
+ # NIC buildup
+ self.nics = []
+ for idx, nic in enumerate(self.op.nics):
+ nic_mode_req = nic.get("mode", None)
+ nic_mode = nic_mode_req
+ if nic_mode is None:
+ nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
+
+ # in routed mode, for the first nic, the default ip is 'auto'
+ if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
+ default_ip_mode = constants.VALUE_AUTO
+ else:
+ default_ip_mode = constants.VALUE_NONE
+
+ # ip validity checks
+ ip = nic.get("ip", default_ip_mode)
+ if ip is None or ip.lower() == constants.VALUE_NONE:
+ nic_ip = None
+ elif ip.lower() == constants.VALUE_AUTO:
+ if not self.op.name_check:
+ raise errors.OpPrereqError("IP address set to auto but name checks"
+ " have been skipped. Aborting.",
+ errors.ECODE_INVAL)
+ nic_ip = self.hostname1.ip
+ else:
+ if not utils.IsValidIP(ip):
+ raise errors.OpPrereqError("Given IP address '%s' doesn't look"
+ " like a valid IP" % ip,
+ errors.ECODE_INVAL)
+ nic_ip = ip
+
+ # TODO: check the ip address for uniqueness
+ if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
+ raise errors.OpPrereqError("Routed nic mode requires an ip address",
+ errors.ECODE_INVAL)
+
+ # MAC address verification
+ mac = nic.get("mac", constants.VALUE_AUTO)
+ if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+ mac = utils.NormalizeAndValidateMac(mac)
+
+ try:
+ self.cfg.ReserveMAC(mac, self.proc.GetECId())
+ except errors.ReservationError:
+ raise errors.OpPrereqError("MAC address %s already in use"
+ " in cluster" % mac,
+ errors.ECODE_NOTUNIQUE)
+
+ # bridge verification
+ bridge = nic.get("bridge", None)
+ link = nic.get("link", None)
+ if bridge and link:
+ raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
+ " at the same time", errors.ECODE_INVAL)
+ elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
+ raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
+ errors.ECODE_INVAL)
+ elif bridge:
+ link = bridge
+
+ nicparams = {}
+ if nic_mode_req:
+ nicparams[constants.NIC_MODE] = nic_mode_req
+ if link:
+ nicparams[constants.NIC_LINK] = link
+
+ check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
+ nicparams)
+ objects.NIC.CheckParameterSyntax(check_params)
+ self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
+
+ # disk checks/pre-build
+ self.disks = []
+ for disk in self.op.disks:
+ mode = disk.get("mode", constants.DISK_RDWR)
+ if mode not in constants.DISK_ACCESS_SET:
+ raise errors.OpPrereqError("Invalid disk access mode '%s'" %
+ mode, errors.ECODE_INVAL)
+ size = disk.get("size", None)
+ if size is None:
+ raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
+ try:
+ size = int(size)
+ except (TypeError, ValueError):
+ raise errors.OpPrereqError("Invalid disk size '%s'" % size,
+ errors.ECODE_INVAL)
+ new_disk = {"size": size, "mode": mode}
+ if "adopt" in disk:
+ new_disk["adopt"] = disk["adopt"]
+ self.disks.append(new_disk)
+
+ if self.op.mode == constants.INSTANCE_IMPORT:
# Check that the new instance doesn't have less disks than the export
instance_disks = len(self.disks)
(instance_disks, export_disks),
errors.ECODE_INVAL)
- self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
disk_images = []
for idx in range(export_disks):
option = 'disk%d_dump' % idx
if export_info.has_option(constants.INISECT_INS, option):
# FIXME: are the old os-es, disk sizes, etc. useful?
export_name = export_info.get(constants.INISECT_INS, option)
- image = utils.PathJoin(src_path, export_name)
+ image = utils.PathJoin(self.op.src_path, export_name)
disk_images.append(image)
else:
disk_images.append(False)
self.src_images = disk_images
old_name = export_info.get(constants.INISECT_INS, 'name')
- # FIXME: int() here could throw a ValueError on broken exports
- exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
+ try:
+ exp_nic_count = export_info.getint(constants.INISECT_INS, 'nic_count')
+ except (TypeError, ValueError), err:
+ raise errors.OpPrereqError("Invalid export file, nic_count is not"
+ " an integer: %s" % str(err),
+ errors.ECODE_STATE)
if self.op.instance_name == old_name:
for idx, nic in enumerate(self.nics):
if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
else:
network_port = None
- ##if self.op.vnc_bind_address is None:
- ## self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
+ if constants.ENABLE_FILE_STORAGE:
+ # this is needed because os.path.join does not accept None arguments
+ if self.op.file_storage_dir is None:
+ string_file_storage_dir = ""
+ else:
+ string_file_storage_dir = self.op.file_storage_dir
- # this is needed because os.path.join does not accept None arguments
- if self.op.file_storage_dir is None:
- string_file_storage_dir = ""
+ # build the full file storage dir path
+ file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
+ string_file_storage_dir, instance)
else:
- string_file_storage_dir = self.op.file_storage_dir
-
- # build the full file storage dir path
- file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
- string_file_storage_dir, instance)
-
+ file_storage_dir = ""
disks = _GenerateDiskTemplate(self,
self.op.disk_template,
elif self.op.mode == constants.INSTANCE_IMPORT:
feedback_fn("* running the instance OS import scripts...")
- src_node = self.op.src_node
- src_images = self.src_images
- cluster_name = self.cfg.GetClusterName()
- # FIXME: pass debug option from opcode to backend
- import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
- src_node, src_images,
- cluster_name,
- self.op.debug_level)
- msg = import_result.fail_msg
- if msg:
- self.LogWarning("Error while importing the disk images for instance"
- " %s on node %s: %s" % (instance, pnode_name, msg))
+
+ transfers = []
+
+ for idx, image in enumerate(self.src_images):
+ if not image:
+ continue
+
+ # FIXME: pass debug option from opcode to backend
+ dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+ constants.IEIO_FILE, (image, ),
+ constants.IEIO_SCRIPT,
+ (iobj.disks[idx], idx),
+ None)
+ transfers.append(dt)
+
+ import_result = \
+ masterd.instance.TransferInstanceData(self, feedback_fn,
+ self.op.src_node, pnode_name,
+ self.pnode.secondary_ip,
+ iobj, transfers)
+ if not compat.all(import_result):
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
+
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ feedback_fn("* preparing remote import...")
+ connect_timeout = constants.RIE_CONNECT_TIMEOUT
+ timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
+
+ disk_results = masterd.instance.RemoteImport(self, feedback_fn, iobj,
+ self.source_x509_ca,
+ self._cds, timeouts)
+ if not compat.all(disk_results):
+ # TODO: Should the instance still be started, even if some disks
+ # failed to import (valid for local imports, too)?
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
+
+ # Run rename script on newly imported instance
+ assert iobj.name == instance
+ feedback_fn("Running rename script for %s" % instance)
+ result = self.rpc.call_instance_run_rename(pnode_name, iobj,
+ self.source_instance_name,
+ self.op.debug_level)
+ if result.fail_msg:
+ self.LogWarning("Failed to run rename script for %s on node"
+ " %s: %s" % (instance, pnode_name, result.fail_msg))
+
else:
# also checked in the prereq part
raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
def CheckArguments(self):
self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+ _CheckStorageType(self.op.storage_type)
+
def ExpandNames(self):
self.needed_locks = {
locking.LEVEL_NODE: [self.op.node_name],
"""
instance = self.instance
disk = self.disk
+
+ disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
+ if not disks_ok:
+ raise errors.OpExecError("Cannot activate block device to grow")
+
for node in instance.all_nodes:
self.cfg.SetDiskID(disk, node)
result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
disk.RecordGrow(self.op.amount)
self.cfg.Update(instance, feedback_fn)
if self.op.wait_for_sync:
- disk_abort = not _WaitForSync(self, instance)
+ disk_abort = not _WaitForSync(self, instance, disks=[disk])
if disk_abort:
self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
" status.\nPlease check the instance.")
+ if not instance.admin_up:
+ _SafeShutdownInstanceDisks(self, instance, disks=[disk])
+ elif not instance.admin_up:
+ self.proc.LogWarning("Not shutting down the disk even if the instance is"
+ " not supposed to be running because no wait for"
+ " sync mode was requested.")
class LUQueryInstanceData(NoHooksLU):
"os": instance.os,
# this happens to be the same format used for hooks
"nics": _NICListToTuple(self, instance.nics),
+ "disk_template": instance.disk_template,
"disks": disks,
"hypervisor": instance.hypervisor,
"network_port": instance.network_port,
(constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain,
}
+
class LUQueryExports(NoHooksLU):
"""Query the exports list
return result
+class LUPrepareExport(NoHooksLU):
+ """Prepares an instance for an export and returns useful information.
+
+ """
+ _OP_REQP = ["instance_name", "mode"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ """Check the arguments.
+
+ """
+ if self.op.mode not in constants.EXPORT_MODES:
+ raise errors.OpPrereqError("Invalid export mode %r" % self.op.mode,
+ errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ self._ExpandAndLockInstance()
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ instance_name = self.op.instance_name
+
+ self.instance = self.cfg.GetInstanceInfo(instance_name)
+ assert self.instance is not None, \
+ "Cannot retrieve locked instance %s" % self.op.instance_name
+ _CheckNodeOnline(self, self.instance.primary_node)
+
+ self._cds = _GetClusterDomainSecret()
+
+ def Exec(self, feedback_fn):
+ """Prepares an instance for an export.
+
+ """
+ instance = self.instance
+
+ if self.op.mode == constants.EXPORT_MODE_REMOTE:
+ salt = utils.GenerateSecret(8)
+
+ feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
+ result = self.rpc.call_x509_cert_create(instance.primary_node,
+ constants.RIE_CERT_VALIDITY)
+ result.Raise("Can't create X509 key and certificate on %s" % result.node)
+
+ (name, cert_pem) = result.payload
+
+ cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ cert_pem)
+
+ return {
+ "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
+ "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
+ salt),
+ "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
+ }
+
+ return None
+
+
class LUExportInstance(LogicalUnit):
"""Export an instance to an image in the cluster.
"""Check the arguments.
"""
+ _CheckBooleanOpField(self.op, "remove_instance")
+ _CheckBooleanOpField(self.op, "ignore_remove_failures")
+
self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
constants.DEFAULT_SHUTDOWN_TIMEOUT)
+ self.remove_instance = getattr(self.op, "remove_instance", False)
+ self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
+ False)
+ self.export_mode = getattr(self.op, "mode", constants.EXPORT_MODE_LOCAL)
+ self.x509_key_name = getattr(self.op, "x509_key_name", None)
+ self.dest_x509_ca_pem = getattr(self.op, "destination_x509_ca", None)
+
+ if self.remove_instance and not self.op.shutdown:
+ raise errors.OpPrereqError("Can not remove instance without shutting it"
+ " down before")
+
+ if self.export_mode not in constants.EXPORT_MODES:
+ raise errors.OpPrereqError("Invalid export mode %r" % self.export_mode,
+ errors.ECODE_INVAL)
+
+ if self.export_mode == constants.EXPORT_MODE_REMOTE:
+ if not self.x509_key_name:
+ raise errors.OpPrereqError("Missing X509 key name for encryption",
+ errors.ECODE_INVAL)
+
+ if not self.dest_x509_ca_pem:
+ raise errors.OpPrereqError("Missing destination X509 CA",
+ errors.ECODE_INVAL)
def ExpandNames(self):
self._ExpandAndLockInstance()
- # FIXME: lock only instance primary and destination node
- #
- # Sad but true, for now we have do lock all nodes, as we don't know where
- # the previous export might be, and and in this LU we search for it and
- # remove it from its current node. In the future we could fix this by:
- # - making a tasklet to search (share-lock all), then create the new one,
- # then one to remove, after
- # - removing the removal operation altogether
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+ # Lock all nodes for local exports
+ if self.export_mode == constants.EXPORT_MODE_LOCAL:
+ # FIXME: lock only instance primary and destination node
+ #
+ # Sad but true, for now we have do lock all nodes, as we don't know where
+ # the previous export might be, and in this LU we search for it and
+ # remove it from its current node. In the future we could fix this by:
+ # - making a tasklet to search (share-lock all), then create the new one,
+ # then one to remove, after
+ # - removing the removal operation altogether
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
def DeclareLocks(self, level):
"""Last minute lock declaration."""
"""
env = {
+ "EXPORT_MODE": self.export_mode,
"EXPORT_NODE": self.op.target_node,
"EXPORT_DO_SHUTDOWN": self.op.shutdown,
"SHUTDOWN_TIMEOUT": self.shutdown_timeout,
+ # TODO: Generic function for boolean env variables
+ "REMOVE_INSTANCE": str(bool(self.remove_instance)),
}
+
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
- nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
- self.op.target_node]
+
+ nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
+
+ if self.export_mode == constants.EXPORT_MODE_LOCAL:
+ nl.append(self.op.target_node)
+
return env, nl, nl
def CheckPrereq(self):
"""
instance_name = self.op.instance_name
+
self.instance = self.cfg.GetInstanceInfo(instance_name)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
_CheckNodeOnline(self, self.instance.primary_node)
- self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
- self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
- assert self.dst_node is not None
+ if self.export_mode == constants.EXPORT_MODE_LOCAL:
+ self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+ self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
+ assert self.dst_node is not None
+
+ _CheckNodeOnline(self, self.dst_node.name)
+ _CheckNodeNotDrained(self, self.dst_node.name)
+
+ self._cds = None
+ self.dest_x509_ca = None
+
+ elif self.export_mode == constants.EXPORT_MODE_REMOTE:
+ self.dst_node = None
+
+ if len(self.op.target_node) != len(self.instance.disks):
+ raise errors.OpPrereqError(("Received destination information for %s"
+ " disks, but instance %s has %s disks") %
+ (len(self.op.target_node), instance_name,
+ len(self.instance.disks)),
+ errors.ECODE_INVAL)
+
+ cds = _GetClusterDomainSecret()
+
+ # Check X509 key name
+ try:
+ (key_name, hmac_digest, hmac_salt) = self.x509_key_name
+ except (TypeError, ValueError), err:
+ raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
+
+ if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
+ raise errors.OpPrereqError("HMAC for X509 key name is wrong",
+ errors.ECODE_INVAL)
- _CheckNodeOnline(self, self.dst_node.name)
- _CheckNodeNotDrained(self, self.dst_node.name)
+ # Load and verify CA
+ try:
+ (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
+ except OpenSSL.crypto.Error, err:
+ raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
+ (err, ), errors.ECODE_INVAL)
+
+ (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
+ if errcode is not None:
+ raise errors.OpPrereqError("Invalid destination X509 CA (%s)" % (msg, ),
+ errors.ECODE_INVAL)
+
+ self.dest_x509_ca = cert
+
+ # Verify target information
+ for idx, disk_data in enumerate(self.op.target_node):
+ try:
+ masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
+ except errors.GenericError, err:
+ raise errors.OpPrereqError("Target info for disk %s: %s" % (idx, err),
+ errors.ECODE_INVAL)
+
+ else:
+ raise errors.ProgrammerError("Unhandled export mode %r" %
+ self.export_mode)
# instance disk type verification
+ # TODO: Implement export support for file-based disks
for disk in self.instance.disks:
if disk.dev_type == constants.LD_FILE:
raise errors.OpPrereqError("Export not supported for instances with"
" file-based disks", errors.ECODE_INVAL)
+ def _CleanupExports(self, feedback_fn):
+ """Removes exports of current instance from all other nodes.
+
+ If an instance in a cluster with nodes A..D was exported to node C, its
+ exports will be removed from the nodes A, B and D.
+
+ """
+ assert self.export_mode != constants.EXPORT_MODE_REMOTE
+
+ nodelist = self.cfg.GetNodeList()
+ nodelist.remove(self.dst_node.name)
+
+ # on one-node clusters nodelist will be empty after the removal
+ # if we proceed the backup would be removed because OpQueryExports
+ # substitutes an empty list with the full cluster node list.
+ iname = self.instance.name
+ if nodelist:
+ feedback_fn("Removing old exports for instance %s" % iname)
+ exportlist = self.rpc.call_export_list(nodelist)
+ for node in exportlist:
+ if exportlist[node].fail_msg:
+ continue
+ if iname in exportlist[node].payload:
+ msg = self.rpc.call_export_remove(node, iname).fail_msg
+ if msg:
+ self.LogWarning("Could not remove older export for instance %s"
+ " on node %s: %s", iname, node, msg)
+
def Exec(self, feedback_fn):
"""Export an instance to an image in the cluster.
"""
+ assert self.export_mode in constants.EXPORT_MODES
+
instance = self.instance
- dst_node = self.dst_node
src_node = instance.primary_node
if self.op.shutdown:
feedback_fn("Shutting down instance %s" % instance.name)
result = self.rpc.call_instance_shutdown(src_node, instance,
self.shutdown_timeout)
+ # TODO: Maybe ignore failures if ignore_remove_failures is set
result.Raise("Could not shutdown instance %s on"
" node %s" % (instance.name, src_node))
- vgname = self.cfg.GetVGName()
-
- snap_disks = []
-
# set the disks ID correctly since call_instance_start needs the
# correct drbd minor to create the symlinks
for disk in instance.disks:
_StartInstanceDisks(self, instance, None)
try:
- # per-disk results
- dresults = []
+ helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
+ instance)
+
+ helper.CreateSnapshots()
try:
- for idx, disk in enumerate(instance.disks):
- feedback_fn("Creating a snapshot of disk/%s on node %s" %
- (idx, src_node))
-
- # result.payload will be a snapshot of an lvm leaf of the one we
- # passed
- result = self.rpc.call_blockdev_snapshot(src_node, disk)
- msg = result.fail_msg
- if msg:
- self.LogWarning("Could not snapshot disk/%s on node %s: %s",
- idx, src_node, msg)
- snap_disks.append(False)
- else:
- disk_id = (vgname, result.payload)
- new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
- logical_id=disk_id, physical_id=disk_id,
- iv_name=disk.iv_name)
- snap_disks.append(new_dev)
+ if self.export_mode == constants.EXPORT_MODE_LOCAL:
+ (fin_resu, dresults) = helper.LocalExport(self.dst_node)
+ elif self.export_mode == constants.EXPORT_MODE_REMOTE:
+ connect_timeout = constants.RIE_CONNECT_TIMEOUT
+ timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
+
+ (key_name, _, _) = self.x509_key_name
+
+ dest_ca_pem = \
+ OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ self.dest_x509_ca)
+ opts = objects.ImportExportOptions(key_name=key_name,
+ ca_pem=dest_ca_pem)
+
+ (fin_resu, dresults) = helper.RemoteExport(opts, self.op.target_node,
+ timeouts)
finally:
- if self.op.shutdown and instance.admin_up:
- feedback_fn("Starting instance %s" % instance.name)
- result = self.rpc.call_instance_start(src_node, instance, None, None)
- msg = result.fail_msg
- if msg:
- _ShutdownInstanceDisks(self, instance)
- raise errors.OpExecError("Could not start instance: %s" % msg)
-
- # TODO: check for size
-
- cluster_name = self.cfg.GetClusterName()
- for idx, dev in enumerate(snap_disks):
- feedback_fn("Exporting snapshot %s from %s to %s" %
- (idx, src_node, dst_node.name))
- if dev:
- # FIXME: pass debug from opcode to backend
- result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
- instance, cluster_name,
- idx, self.op.debug_level)
- msg = result.fail_msg
- if msg:
- self.LogWarning("Could not export disk/%s from node %s to"
- " node %s: %s", idx, src_node, dst_node.name, msg)
- dresults.append(False)
- else:
- dresults.append(True)
- msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
- if msg:
- self.LogWarning("Could not remove snapshot for disk/%d from node"
- " %s: %s", idx, src_node, msg)
- else:
- dresults.append(False)
+ helper.Cleanup()
- feedback_fn("Finalizing export on %s" % dst_node.name)
- result = self.rpc.call_finalize_export(dst_node.name, instance,
- snap_disks)
- fin_resu = True
- msg = result.fail_msg
- if msg:
- self.LogWarning("Could not finalize export for instance %s"
- " on node %s: %s", instance.name, dst_node.name, msg)
- fin_resu = False
+ # Check for backwards compatibility
+ assert len(dresults) == len(instance.disks)
+ assert compat.all(isinstance(i, bool) for i in dresults), \
+ "Not all results are boolean: %r" % dresults
finally:
if activate_disks:
feedback_fn("Deactivating disks for %s" % instance.name)
_ShutdownInstanceDisks(self, instance)
- nodelist = self.cfg.GetNodeList()
- nodelist.remove(dst_node.name)
+ # Remove instance if requested
+ if self.remove_instance:
+ if not (compat.all(dresults) and fin_resu):
+ feedback_fn("Not removing instance %s as parts of the export failed" %
+ instance.name)
+ else:
+ feedback_fn("Removing instance %s" % instance.name)
+ _RemoveInstance(self, feedback_fn, instance,
+ self.ignore_remove_failures)
+
+ if self.export_mode == constants.EXPORT_MODE_LOCAL:
+ self._CleanupExports(feedback_fn)
- # on one-node clusters nodelist will be empty after the removal
- # if we proceed the backup would be removed because OpQueryExports
- # substitutes an empty list with the full cluster node list.
- iname = instance.name
- if nodelist:
- feedback_fn("Removing old exports for instance %s" % iname)
- exportlist = self.rpc.call_export_list(nodelist)
- for node in exportlist:
- if exportlist[node].fail_msg:
- continue
- if iname in exportlist[node].payload:
- msg = self.rpc.call_export_remove(node, iname).fail_msg
- if msg:
- self.LogWarning("Could not remove older export for instance %s"
- " on node %s: %s", iname, node, msg)
return fin_resu, dresults