from ganeti import objects
from ganeti import serializer
from ganeti import ssconf
+from ganeti import uidpool
+from ganeti import compat
+from ganeti import masterd
+
+import ganeti.masterd.instance # pylint: disable-msg=W0611
class LogicalUnit(object):
_CheckOSVariant(result.payload, os_name)
+def _RequireFileStorage():
+ """Checks that file storage is enabled.
+
+ @raise errors.OpPrereqError: when file storage is disabled
+
+ """
+ if not constants.ENABLE_FILE_STORAGE:
+ raise errors.OpPrereqError("File storage disabled at configure time",
+ errors.ECODE_INVAL)
+
+
def _CheckDiskTemplate(template):
"""Ensure a given disk template is valid.
msg = ("Invalid disk template name '%s', valid templates are: %s" %
(template, utils.CommaJoin(constants.DISK_TEMPLATES)))
raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
- if template == constants.DT_FILE and not constants.ENABLE_FILE_STORAGE:
- raise errors.OpPrereqError("File storage disabled at configure time",
+ if template == constants.DT_FILE:
+ _RequireFileStorage()
+
+
+def _CheckStorageType(storage_type):
+ """Ensure a given storage type is valid.
+
+ """
+ if storage_type not in constants.VALID_STORAGE_TYPES:
+ raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
errors.ECODE_INVAL)
+ if storage_type == constants.ST_FILE:
+ _RequireFileStorage()
+
def _CheckInstanceDown(lu, instance, reason):
return faulty
-def _FormatTimestamp(secs):
- """Formats a Unix timestamp with the local timezone.
-
- """
- return time.strftime("%F %T %Z", time.gmtime(secs))
-
-
class LUPostInitCluster(LogicalUnit):
"""Logical unit for running hooks after cluster initialization.
return master
-def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
- warn_days=constants.SSL_CERT_EXPIRATION_WARN,
- error_days=constants.SSL_CERT_EXPIRATION_ERROR):
- """Verifies certificate details for LUVerifyCluster.
-
- """
- if expired:
- msg = "Certificate %s is expired" % filename
-
- if not_before is not None and not_after is not None:
- msg += (" (valid from %s to %s)" %
- (_FormatTimestamp(not_before),
- _FormatTimestamp(not_after)))
- elif not_before is not None:
- msg += " (valid from %s)" % _FormatTimestamp(not_before)
- elif not_after is not None:
- msg += " (valid until %s)" % _FormatTimestamp(not_after)
-
- return (LUVerifyCluster.ETYPE_ERROR, msg)
-
- elif not_before is not None and not_before > now:
- return (LUVerifyCluster.ETYPE_WARNING,
- "Certificate %s not yet valid (valid from %s)" %
- (filename, _FormatTimestamp(not_before)))
-
- elif not_after is not None:
- remaining_days = int((not_after - now) / (24 * 3600))
-
- msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
-
- if remaining_days <= error_days:
- return (LUVerifyCluster.ETYPE_ERROR, msg)
-
- if remaining_days <= warn_days:
- return (LUVerifyCluster.ETYPE_WARNING, msg)
-
- return (None, None)
-
-
def _VerifyCertificate(filename):
"""Verifies a certificate for LUVerifyCluster.
return (LUVerifyCluster.ETYPE_ERROR,
"Failed to load X509 certificate %s: %s" % (filename, err))
- # Depending on the pyOpenSSL version, this can just return (None, None)
- (not_before, not_after) = utils.GetX509CertValidity(cert)
+ (errcode, msg) = \
+ utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
+ constants.SSL_CERT_EXPIRATION_ERROR)
+
+ if msg:
+ fnamemsg = "While verifying %s: %s" % (filename, msg)
+ else:
+ fnamemsg = None
- return _VerifyCertificateInner(filename, cert.has_expired(),
- not_before, not_after, time.time())
+ if errcode is None:
+ return (None, fnamemsg)
+ elif errcode == utils.CERT_WARNING:
+ return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
+ elif errcode == utils.CERT_ERROR:
+ return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
+
+ raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
class LUVerifyCluster(LogicalUnit):
vg_name = self.cfg.GetVGName()
hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
+ cluster = self.cfg.GetClusterInfo()
nodelist = utils.NiceSort(self.cfg.GetNodeList())
nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
instancelist = utils.NiceSort(self.cfg.GetInstanceList())
file_names = ssconf.SimpleStore().GetFileList()
file_names.extend(constants.ALL_CERT_FILES)
file_names.extend(master_files)
+ if cluster.modify_etc_hosts:
+ file_names.append(constants.ETC_HOSTS)
local_checksums = utils.FingerprintFiles(file_names)
self.cfg.GetClusterName())
nvinfo_endtime = time.time()
- cluster = self.cfg.GetClusterInfo()
master_node = self.cfg.GetMasterNode()
all_drbd_map = self.cfg.ComputeDRBDMap()
"""Check parameters
"""
- if not hasattr(self.op, "candidate_pool_size"):
- self.op.candidate_pool_size = None
+ for attr in ["candidate_pool_size",
+ "uid_pool", "add_uids", "remove_uids"]:
+ if not hasattr(self.op, attr):
+ setattr(self.op, attr, None)
+
if self.op.candidate_pool_size is not None:
try:
self.op.candidate_pool_size = int(self.op.candidate_pool_size)
if self.op.candidate_pool_size < 1:
raise errors.OpPrereqError("At least one master candidate needed",
errors.ECODE_INVAL)
+
_CheckBooleanOpField(self.op, "maintain_node_health")
+ if self.op.uid_pool:
+ uidpool.CheckUidPool(self.op.uid_pool)
+
+ if self.op.add_uids:
+ uidpool.CheckUidPool(self.op.add_uids)
+
+ if self.op.remove_uids:
+ uidpool.CheckUidPool(self.op.remove_uids)
+
def ExpandNames(self):
# FIXME: in the future maybe other cluster params won't require checking on
# all nodes to be modified.
"\n".join(nic_errors))
# hypervisor list/parameters
- self.new_hvparams = objects.FillDict(cluster.hvparams, {})
+ self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
if self.op.hvparams:
if not isinstance(self.op.hvparams, dict):
raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
else:
self.new_os_hvp[os_name][hv_name].update(hv_dict)
+ # changes to the hypervisor list
if self.op.enabled_hypervisors is not None:
self.hv_list = self.op.enabled_hypervisors
if not self.hv_list:
" entries: %s" %
utils.CommaJoin(invalid_hvs),
errors.ECODE_INVAL)
+ for hv in self.hv_list:
+ # if the hypervisor doesn't already exist in the cluster
+ # hvparams, we initialize it to empty, and then (in both
+ # cases) we make sure to fill the defaults, as we might not
+ # have a complete defaults list if the hypervisor wasn't
+ # enabled before
+ if hv not in new_hvp:
+ new_hvp[hv] = {}
+ new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
+ utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
else:
self.hv_list = cluster.enabled_hypervisors
if self.op.os_hvp:
self.cluster.os_hvp = self.new_os_hvp
if self.op.enabled_hypervisors is not None:
+ self.cluster.hvparams = self.new_hvparams
self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
if self.op.beparams:
self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
if self.op.maintain_node_health is not None:
self.cluster.maintain_node_health = self.op.maintain_node_health
+ if self.op.add_uids is not None:
+ uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
+
+ if self.op.remove_uids is not None:
+ uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
+
+ if self.op.uid_pool is not None:
+ self.cluster.uid_pool = self.op.uid_pool
+
self.cfg.Update(self.cluster, feedback_fn)
self.LogWarning("Errors encountered on the remote node while leaving"
" the cluster: %s", msg)
+ # Remove node from our /etc/hosts
+ if self.cfg.GetClusterInfo().modify_etc_hosts:
+ # FIXME: this should be done via an rpc call to node daemon
+ utils.RemoveHostFromEtcHosts(node.name)
+ _RedistributeAncillaryFiles(self)
+
class LUQueryNodes(NoHooksLU):
"""Logical unit for querying nodes.
REQ_BGL = False
_FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
- def ExpandNames(self):
- storage_type = self.op.storage_type
-
- if storage_type not in constants.VALID_STORAGE_TYPES:
- raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
- errors.ECODE_INVAL)
+ def CheckArguments(self):
+ _CheckStorageType(self.op.storage_type)
_CheckOutputFields(static=self._FIELDS_STATIC,
dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
selected=self.op.output_fields)
+ def ExpandNames(self):
self.needed_locks = {}
self.share_locks[locking.LEVEL_NODE] = 1
def CheckArguments(self):
self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
- storage_type = self.op.storage_type
- if storage_type not in constants.VALID_STORAGE_TYPES:
- raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
- errors.ECODE_INVAL)
+ _CheckStorageType(self.op.storage_type)
def ExpandNames(self):
self.needed_locks = {
raise errors.OpPrereqError("Node %s is not in the configuration" % node,
errors.ECODE_NOENT)
+ self.changed_primary_ip = False
+
for existing_node_name in node_list:
existing_node = cfg.GetNodeInfo(existing_node_name)
if self.op.readd and node == existing_node_name:
- if (existing_node.primary_ip != primary_ip or
- existing_node.secondary_ip != secondary_ip):
+ if existing_node.secondary_ip != secondary_ip:
raise errors.OpPrereqError("Readded node doesn't have the same IP"
" address configuration as before",
errors.ECODE_INVAL)
+ if existing_node.primary_ip != primary_ip:
+ self.changed_primary_ip = True
+
continue
if (existing_node.primary_ip == primary_ip or
self.LogInfo("Readding a node, the offline/drained flags were reset")
# if we demote the node, we do cleanup later in the procedure
new_node.master_candidate = self.master_candidate
+ if self.changed_primary_ip:
+ new_node.primary_ip = self.op.primary_ip
# notify the user about any possible mc promotion
if new_node.master_candidate:
# Add node to our /etc/hosts, and add key to known_hosts
if self.cfg.GetClusterInfo().modify_etc_hosts:
+ # FIXME: this should be done via an rpc call to node daemon
utils.AddHostToEtcHosts(new_node.name)
if new_node.secondary_ip != new_node.primary_ip:
"mtime": cluster.mtime,
"uuid": cluster.uuid,
"tags": list(cluster.GetTags()),
+ "uid_pool": cluster.uid_pool,
}
return result
if len(secondary_nodes) != 0:
raise errors.ProgrammerError("Wrong template configuration")
+ _RequireFileStorage()
+
for idx, disk in enumerate(disk_info):
disk_index = idx + base_index
disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
else:
network_port = None
- # this is needed because os.path.join does not accept None arguments
- if self.op.file_storage_dir is None:
- string_file_storage_dir = ""
- else:
- string_file_storage_dir = self.op.file_storage_dir
+ if constants.ENABLE_FILE_STORAGE:
+ # this is needed because os.path.join does not accept None arguments
+ if self.op.file_storage_dir is None:
+ string_file_storage_dir = ""
+ else:
+ string_file_storage_dir = self.op.file_storage_dir
- # build the full file storage dir path
- file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
- string_file_storage_dir, instance)
+ # build the full file storage dir path
+ file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
+ string_file_storage_dir, instance)
+ else:
+ file_storage_dir = ""
disks = _GenerateDiskTemplate(self,
self.op.disk_template,
elif self.op.mode == constants.INSTANCE_IMPORT:
feedback_fn("* running the instance OS import scripts...")
- src_node = self.op.src_node
- src_images = self.src_images
- cluster_name = self.cfg.GetClusterName()
- # FIXME: pass debug option from opcode to backend
- import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
- src_node, src_images,
- cluster_name,
- self.op.debug_level)
- msg = import_result.fail_msg
- if msg:
- self.LogWarning("Error while importing the disk images for instance"
- " %s on node %s: %s" % (instance, pnode_name, msg))
+
+ transfers = []
+
+ for idx, image in enumerate(self.src_images):
+ if not image:
+ continue
+
+ # FIXME: pass debug option from opcode to backend
+ dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+ constants.IEIO_FILE, (image, ),
+ constants.IEIO_SCRIPT,
+ (iobj.disks[idx], idx),
+ None)
+ transfers.append(dt)
+
+ import_result = \
+ masterd.instance.TransferInstanceData(self, feedback_fn,
+ self.op.src_node, pnode_name,
+ self.pnode.secondary_ip,
+ iobj, transfers)
+ if not compat.all(import_result):
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
+
else:
# also checked in the prereq part
raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
def CheckArguments(self):
self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+ _CheckStorageType(self.op.storage_type)
+
def ExpandNames(self):
self.needed_locks = {
locking.LEVEL_NODE: [self.op.node_name],
raise errors.OpPrereqError("Export not supported for instances with"
" file-based disks", errors.ECODE_INVAL)
+ def _CreateSnapshots(self, feedback_fn):
+ """Creates an LVM snapshot for every disk of the instance.
+
+ @return: List of snapshots as L{objects.Disk} instances
+
+ """
+ instance = self.instance
+ src_node = instance.primary_node
+
+ vgname = self.cfg.GetVGName()
+
+ snap_disks = []
+
+ for idx, disk in enumerate(instance.disks):
+ feedback_fn("Creating a snapshot of disk/%s on node %s" %
+ (idx, src_node))
+
+ # result.payload will be a snapshot of an lvm leaf of the one we
+ # passed
+ result = self.rpc.call_blockdev_snapshot(src_node, disk)
+ msg = result.fail_msg
+ if msg:
+ self.LogWarning("Could not snapshot disk/%s on node %s: %s",
+ idx, src_node, msg)
+ snap_disks.append(False)
+ else:
+ disk_id = (vgname, result.payload)
+ new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
+ logical_id=disk_id, physical_id=disk_id,
+ iv_name=disk.iv_name)
+ snap_disks.append(new_dev)
+
+ return snap_disks
+
+ def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index):
+ """Removes an LVM snapshot.
+
+ @type snap_disks: list
+ @param snap_disks: The list of all snapshots as returned by
+ L{_CreateSnapshots}
+ @type disk_index: number
+ @param disk_index: Index of the snapshot to be removed
+ @rtype: bool
+ @return: Whether removal was successful or not
+
+ """
+ disk = snap_disks[disk_index]
+ if disk:
+ src_node = self.instance.primary_node
+
+ feedback_fn("Removing snapshot of disk/%s on node %s" %
+ (disk_index, src_node))
+
+ result = self.rpc.call_blockdev_remove(src_node, disk)
+ if not result.fail_msg:
+ return True
+
+ self.LogWarning("Could not remove snapshot for disk/%d from node"
+ " %s: %s", disk_index, src_node, result.fail_msg)
+
+ return False
+
+ def _CleanupExports(self, feedback_fn):
+ """Removes exports of current instance from all other nodes.
+
+ If an instance in a cluster with nodes A..D was exported to node C, its
+ exports will be removed from the nodes A, B and D.
+
+ """
+ nodelist = self.cfg.GetNodeList()
+ nodelist.remove(self.dst_node.name)
+
+ # on one-node clusters nodelist will be empty after the removal
+ # if we proceed the backup would be removed because OpQueryExports
+ # substitutes an empty list with the full cluster node list.
+ iname = self.instance.name
+ if nodelist:
+ feedback_fn("Removing old exports for instance %s" % iname)
+ exportlist = self.rpc.call_export_list(nodelist)
+ for node in exportlist:
+ if exportlist[node].fail_msg:
+ continue
+ if iname in exportlist[node].payload:
+ msg = self.rpc.call_export_remove(node, iname).fail_msg
+ if msg:
+ self.LogWarning("Could not remove older export for instance %s"
+ " on node %s: %s", iname, node, msg)
+
def Exec(self, feedback_fn):
"""Export an instance to an image in the cluster.
result.Raise("Could not shutdown instance %s on"
" node %s" % (instance.name, src_node))
- vgname = self.cfg.GetVGName()
-
- snap_disks = []
-
# set the disks ID correctly since call_instance_start needs the
# correct drbd minor to create the symlinks
for disk in instance.disks:
try:
# per-disk results
- dresults = []
+ removed_snaps = [False] * len(instance.disks)
+
+ snap_disks = None
try:
- for idx, disk in enumerate(instance.disks):
- feedback_fn("Creating a snapshot of disk/%s on node %s" %
- (idx, src_node))
-
- # result.payload will be a snapshot of an lvm leaf of the one we
- # passed
- result = self.rpc.call_blockdev_snapshot(src_node, disk)
- msg = result.fail_msg
- if msg:
- self.LogWarning("Could not snapshot disk/%s on node %s: %s",
- idx, src_node, msg)
- snap_disks.append(False)
- else:
- disk_id = (vgname, result.payload)
- new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
- logical_id=disk_id, physical_id=disk_id,
- iv_name=disk.iv_name)
- snap_disks.append(new_dev)
+ try:
+ snap_disks = self._CreateSnapshots(feedback_fn)
+ finally:
+ if (self.op.shutdown and instance.admin_up and
+ not self.remove_instance):
+ feedback_fn("Starting instance %s" % instance.name)
+ result = self.rpc.call_instance_start(src_node, instance,
+ None, None)
+ msg = result.fail_msg
+ if msg:
+ _ShutdownInstanceDisks(self, instance)
+ raise errors.OpExecError("Could not start instance: %s" % msg)
+
+ assert len(snap_disks) == len(instance.disks)
+ assert len(removed_snaps) == len(instance.disks)
+
+ # TODO: check for size
+
+ def _TransferFinished(idx):
+ logging.debug("Transfer %s finished", idx)
+ if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
+ removed_snaps[idx] = True
+
+ transfers = []
+
+ for idx, dev in enumerate(snap_disks):
+ if not dev:
+ transfers.append(None)
+ continue
- finally:
- if self.op.shutdown and instance.admin_up and not self.remove_instance:
- feedback_fn("Starting instance %s" % instance.name)
- result = self.rpc.call_instance_start(src_node, instance, None, None)
- msg = result.fail_msg
- if msg:
- _ShutdownInstanceDisks(self, instance)
- raise errors.OpExecError("Could not start instance: %s" % msg)
-
- # TODO: check for size
-
- cluster_name = self.cfg.GetClusterName()
- for idx, dev in enumerate(snap_disks):
- feedback_fn("Exporting snapshot %s from %s to %s" %
- (idx, src_node, dst_node.name))
- if dev:
- # FIXME: pass debug from opcode to backend
- result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
- instance, cluster_name,
- idx, self.op.debug_level)
- msg = result.fail_msg
- if msg:
- self.LogWarning("Could not export disk/%s from node %s to"
- " node %s: %s", idx, src_node, dst_node.name, msg)
- dresults.append(False)
- else:
- dresults.append(True)
- msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
- if msg:
- self.LogWarning("Could not remove snapshot for disk/%d from node"
- " %s: %s", idx, src_node, msg)
- else:
- dresults.append(False)
+ path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
+ dev.physical_id[1])
- feedback_fn("Finalizing export on %s" % dst_node.name)
- result = self.rpc.call_finalize_export(dst_node.name, instance,
- snap_disks)
- fin_resu = True
- msg = result.fail_msg
- if msg:
- self.LogWarning("Could not finalize export for instance %s"
- " on node %s: %s", instance.name, dst_node.name, msg)
- fin_resu = False
+ finished_fn = compat.partial(_TransferFinished, idx)
+
+ # FIXME: pass debug option from opcode to backend
+ dt = masterd.instance.DiskTransfer("snapshot/%s" % idx,
+ constants.IEIO_SCRIPT, (dev, idx),
+ constants.IEIO_FILE, (path, ),
+ finished_fn)
+ transfers.append(dt)
+
+ # Actually export data
+ dresults = \
+ masterd.instance.TransferInstanceData(self, feedback_fn,
+ src_node, dst_node.name,
+ dst_node.secondary_ip,
+ instance, transfers)
+
+ assert len(dresults) == len(instance.disks)
+
+ # Check for backwards compatibility
+ assert compat.all(isinstance(i, bool) for i in dresults), \
+ "Not all results are boolean: %r" % dresults
+
+ feedback_fn("Finalizing export on %s" % dst_node.name)
+ result = self.rpc.call_finalize_export(dst_node.name, instance,
+ snap_disks)
+ msg = result.fail_msg
+ fin_resu = not msg
+ if msg:
+ self.LogWarning("Could not finalize export for instance %s"
+ " on node %s: %s", instance.name, dst_node.name, msg)
+
+ finally:
+ # Remove all snapshots
+ assert len(removed_snaps) == len(instance.disks)
+ for idx, removed in enumerate(removed_snaps):
+ if not removed:
+ self._RemoveSnapshot(feedback_fn, snap_disks, idx)
finally:
if activate_disks:
feedback_fn("Removing instance %s" % instance.name)
_RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures)
- nodelist = self.cfg.GetNodeList()
- nodelist.remove(dst_node.name)
-
- # on one-node clusters nodelist will be empty after the removal
- # if we proceed the backup would be removed because OpQueryExports
- # substitutes an empty list with the full cluster node list.
- iname = instance.name
- if nodelist:
- feedback_fn("Removing old exports for instance %s" % iname)
- exportlist = self.rpc.call_export_list(nodelist)
- for node in exportlist:
- if exportlist[node].fail_msg:
- continue
- if iname in exportlist[node].payload:
- msg = self.rpc.call_export_remove(node, iname).fail_msg
- if msg:
- self.LogWarning("Could not remove older export for instance %s"
- " on node %s: %s", iname, node, msg)
+ self._CleanupExports(feedback_fn)
return fin_resu, dresults