from ganeti import ssconf
from ganeti import uidpool
from ganeti import compat
+from ganeti import masterd
+
+import ganeti.masterd.instance # pylint: disable-msg=W0611
class LogicalUnit(object):
_RequireFileStorage()
+def _GetClusterDomainSecret():
+ """Reads the cluster domain secret.
+
+ """
+ return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
+ strict=True)
+
def _CheckInstanceDown(lu, instance, reason):
"""Ensure that an instance is not running."""
return faulty
-def _FormatTimestamp(secs):
- """Formats a Unix timestamp with the local timezone.
-
- """
- return time.strftime("%F %T %Z", time.gmtime(secs))
-
-
class LUPostInitCluster(LogicalUnit):
"""Logical unit for running hooks after cluster initialization.
return master
-def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
- warn_days=constants.SSL_CERT_EXPIRATION_WARN,
- error_days=constants.SSL_CERT_EXPIRATION_ERROR):
- """Verifies certificate details for LUVerifyCluster.
-
- """
- if expired:
- msg = "Certificate %s is expired" % filename
-
- if not_before is not None and not_after is not None:
- msg += (" (valid from %s to %s)" %
- (_FormatTimestamp(not_before),
- _FormatTimestamp(not_after)))
- elif not_before is not None:
- msg += " (valid from %s)" % _FormatTimestamp(not_before)
- elif not_after is not None:
- msg += " (valid until %s)" % _FormatTimestamp(not_after)
-
- return (LUVerifyCluster.ETYPE_ERROR, msg)
-
- elif not_before is not None and not_before > now:
- return (LUVerifyCluster.ETYPE_WARNING,
- "Certificate %s not yet valid (valid from %s)" %
- (filename, _FormatTimestamp(not_before)))
-
- elif not_after is not None:
- remaining_days = int((not_after - now) / (24 * 3600))
-
- msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
-
- if remaining_days <= error_days:
- return (LUVerifyCluster.ETYPE_ERROR, msg)
-
- if remaining_days <= warn_days:
- return (LUVerifyCluster.ETYPE_WARNING, msg)
-
- return (None, None)
-
-
def _VerifyCertificate(filename):
"""Verifies a certificate for LUVerifyCluster.
return (LUVerifyCluster.ETYPE_ERROR,
"Failed to load X509 certificate %s: %s" % (filename, err))
- # Depending on the pyOpenSSL version, this can just return (None, None)
- (not_before, not_after) = utils.GetX509CertValidity(cert)
+ (errcode, msg) = \
+ utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
+ constants.SSL_CERT_EXPIRATION_ERROR)
- return _VerifyCertificateInner(filename, cert.has_expired(),
- not_before, not_after, time.time())
+ if msg:
+ fnamemsg = "While verifying %s: %s" % (filename, msg)
+ else:
+ fnamemsg = None
+
+ if errcode is None:
+ return (None, fnamemsg)
+ elif errcode == utils.CERT_WARNING:
+ return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
+ elif errcode == utils.CERT_ERROR:
+ return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
+
+ raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
class LUVerifyCluster(LogicalUnit):
constants.RAPI_CERT_FILE,
constants.RAPI_USERS_FILE,
constants.CONFD_HMAC_KEY,
+ constants.CLUSTER_DOMAIN_SECRET_FILE,
])
enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
if mstat.sync_percent is not None:
done = False
if mstat.estimated_time is not None:
- rem_time = "%d estimated seconds remaining" % mstat.estimated_time
+ rem_time = ("%s remaining (estimated)" %
+ utils.FormatSeconds(mstat.estimated_time))
max_time = mstat.estimated_time
else:
rem_time = "no time estimate"
" node %s: %s" %
(instance.name, instance.primary_node, msg))
- logging.info("Removing block devices for instance %s", instance.name)
+ _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
- if not _RemoveDisks(self, instance):
- if self.op.ignore_failures:
- feedback_fn("Warning: can't remove instance's disks")
- else:
- raise errors.OpExecError("Can't remove instance's disks")
- logging.info("Removing instance %s out of cluster config", instance.name)
+def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
+ """Utility function to remove an instance.
+
+ """
+ logging.info("Removing block devices for instance %s", instance.name)
+
+ if not _RemoveDisks(lu, instance):
+ if not ignore_failures:
+ raise errors.OpExecError("Can't remove instance's disks")
+ feedback_fn("Warning: can't remove instance's disks")
+
+ logging.info("Removing instance %s out of cluster config", instance.name)
- self.cfg.RemoveInstance(instance.name)
- self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
+ lu.cfg.RemoveInstance(instance.name)
+
+ assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
+ "Instance lock removal conflict"
+
+ # Remove lock for the instance
+ lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
class LUQueryInstances(NoHooksLU):
self.adopt_disks = has_adopt
# verify creation mode
- if self.op.mode not in (constants.INSTANCE_CREATE,
- constants.INSTANCE_IMPORT):
+ if self.op.mode not in constants.INSTANCE_CREATE_MODES:
raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
self.op.mode, errors.ECODE_INVAL)
self.op.instance_name = self.hostname1.name
# used in CheckPrereq for ip ping check
self.check_ip = self.hostname1.ip
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ raise errors.OpPrereqError("Remote imports require names to be checked" %
+ errors.ECODE_INVAL)
else:
self.check_ip = None
" node must be given",
errors.ECODE_INVAL)
+ self._cds = _GetClusterDomainSecret()
+
if self.op.mode == constants.INSTANCE_IMPORT:
# On import force_variant must be True, because if we forced it at
# initial install, our only chance when importing it back is that it
if self.op.no_install:
self.LogInfo("No-installation mode has no effect during import")
- else: # INSTANCE_CREATE
+ elif self.op.mode == constants.INSTANCE_CREATE:
if getattr(self.op, "os_type", None) is None:
raise errors.OpPrereqError("No guest OS specified",
errors.ECODE_INVAL)
raise errors.OpPrereqError("No disk template specified",
errors.ECODE_INVAL)
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ # Check handshake to ensure both clusters have the same domain secret
+ src_handshake = getattr(self.op, "source_handshake", None)
+ if not src_handshake:
+ raise errors.OpPrereqError("Missing source handshake",
+ errors.ECODE_INVAL)
+
+ errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds,
+ src_handshake)
+ if errmsg:
+ raise errors.OpPrereqError("Invalid handshake: %s" % errmsg,
+ errors.ECODE_INVAL)
+
+ # Load and check source CA
+ self.source_x509_ca_pem = getattr(self.op, "source_x509_ca", None)
+ if not self.source_x509_ca_pem:
+ raise errors.OpPrereqError("Missing source X509 CA",
+ errors.ECODE_INVAL)
+
+ try:
+ (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
+ self._cds)
+ except OpenSSL.crypto.Error, err:
+ raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
+ (err, ), errors.ECODE_INVAL)
+
+ (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
+ if errcode is not None:
+ raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
+ errors.ECODE_INVAL)
+
+ self.source_x509_ca = cert
+
+ src_instance_name = getattr(self.op, "source_instance_name", None)
+ if not src_instance_name:
+ raise errors.OpPrereqError("Missing source instance name",
+ errors.ECODE_INVAL)
+
+ self.source_instance_name = \
+ utils.GetHostInfo(utils.HostInfo.NormalizeName(src_instance_name)).name
+
+ else:
+ raise errors.OpPrereqError("Invalid instance creation mode %r" %
+ self.op.mode, errors.ECODE_INVAL)
+
def ExpandNames(self):
"""ExpandNames for CreateInstance.
else:
file_storage_dir = ""
-
disks = _GenerateDiskTemplate(self,
self.op.disk_template,
instance, pnode_name,
elif self.op.mode == constants.INSTANCE_IMPORT:
feedback_fn("* running the instance OS import scripts...")
- src_node = self.op.src_node
- src_images = self.src_images
- cluster_name = self.cfg.GetClusterName()
- # FIXME: pass debug option from opcode to backend
- import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
- src_node, src_images,
- cluster_name,
- self.op.debug_level)
- msg = import_result.fail_msg
- if msg:
- self.LogWarning("Error while importing the disk images for instance"
- " %s on node %s: %s" % (instance, pnode_name, msg))
+
+ transfers = []
+
+ for idx, image in enumerate(self.src_images):
+ if not image:
+ continue
+
+ # FIXME: pass debug option from opcode to backend
+ dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+ constants.IEIO_FILE, (image, ),
+ constants.IEIO_SCRIPT,
+ (iobj.disks[idx], idx),
+ None)
+ transfers.append(dt)
+
+ import_result = \
+ masterd.instance.TransferInstanceData(self, feedback_fn,
+ self.op.src_node, pnode_name,
+ self.pnode.secondary_ip,
+ iobj, transfers)
+ if not compat.all(import_result):
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
+
+ elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+ feedback_fn("* preparing remote import...")
+ connect_timeout = constants.RIE_CONNECT_TIMEOUT
+ timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
+
+ disk_results = masterd.instance.RemoteImport(self, feedback_fn, iobj,
+ self.source_x509_ca,
+ self._cds, timeouts)
+ if not compat.all(disk_results):
+ # TODO: Should the instance still be started, even if some disks
+ # failed to import (valid for local imports, too)?
+ self.LogWarning("Some disks for instance %s on node %s were not"
+ " imported successfully" % (instance, pnode_name))
+
+ # Run rename script on newly imported instance
+ assert iobj.name == instance
+ feedback_fn("Running rename script for %s" % instance)
+ result = self.rpc.call_instance_run_rename(pnode_name, iobj,
+ self.source_instance_name,
+ self.op.debug_level)
+ if result.fail_msg:
+ self.LogWarning("Failed to run rename script for %s on node"
+ " %s: %s" % (instance, pnode_name, result.fail_msg))
+
else:
# also checked in the prereq part
raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
(constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain,
}
+
class LUQueryExports(NoHooksLU):
"""Query the exports list
return result
+class LUPrepareExport(NoHooksLU):
+ """Prepares an instance for an export and returns useful information.
+
+ """
+ _OP_REQP = ["instance_name", "mode"]
+ REQ_BGL = False
+
+ def CheckArguments(self):
+ """Check the arguments.
+
+ """
+ if self.op.mode not in constants.EXPORT_MODES:
+ raise errors.OpPrereqError("Invalid export mode %r" % self.op.mode,
+ errors.ECODE_INVAL)
+
+ def ExpandNames(self):
+ self._ExpandAndLockInstance()
+
+ def CheckPrereq(self):
+ """Check prerequisites.
+
+ """
+ instance_name = self.op.instance_name
+
+ self.instance = self.cfg.GetInstanceInfo(instance_name)
+ assert self.instance is not None, \
+ "Cannot retrieve locked instance %s" % self.op.instance_name
+ _CheckNodeOnline(self, self.instance.primary_node)
+
+ self._cds = _GetClusterDomainSecret()
+
+ def Exec(self, feedback_fn):
+ """Prepares an instance for an export.
+
+ """
+ instance = self.instance
+
+ if self.op.mode == constants.EXPORT_MODE_REMOTE:
+ salt = utils.GenerateSecret(8)
+
+ feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
+ result = self.rpc.call_x509_cert_create(instance.primary_node,
+ constants.RIE_CERT_VALIDITY)
+ result.Raise("Can't create X509 key and certificate on %s" % result.node)
+
+ (name, cert_pem) = result.payload
+
+ cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ cert_pem)
+
+ return {
+ "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
+ "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
+ salt),
+ "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
+ }
+
+ return None
+
+
class LUExportInstance(LogicalUnit):
"""Export an instance to an image in the cluster.
"""Check the arguments.
"""
+ _CheckBooleanOpField(self.op, "remove_instance")
+ _CheckBooleanOpField(self.op, "ignore_remove_failures")
+
self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
constants.DEFAULT_SHUTDOWN_TIMEOUT)
+ self.remove_instance = getattr(self.op, "remove_instance", False)
+ self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
+ False)
+ self.export_mode = getattr(self.op, "mode", constants.EXPORT_MODE_LOCAL)
+ self.x509_key_name = getattr(self.op, "x509_key_name", None)
+ self.dest_x509_ca_pem = getattr(self.op, "destination_x509_ca", None)
+
+ if self.remove_instance and not self.op.shutdown:
+ raise errors.OpPrereqError("Can not remove instance without shutting it"
+ " down before")
+
+ if self.export_mode not in constants.EXPORT_MODES:
+ raise errors.OpPrereqError("Invalid export mode %r" % self.export_mode,
+ errors.ECODE_INVAL)
+
+ if self.export_mode == constants.EXPORT_MODE_REMOTE:
+ if not self.x509_key_name:
+ raise errors.OpPrereqError("Missing X509 key name for encryption",
+ errors.ECODE_INVAL)
+
+ if not self.dest_x509_ca_pem:
+ raise errors.OpPrereqError("Missing destination X509 CA",
+ errors.ECODE_INVAL)
def ExpandNames(self):
self._ExpandAndLockInstance()
- # FIXME: lock only instance primary and destination node
- #
- # Sad but true, for now we have do lock all nodes, as we don't know where
- # the previous export might be, and and in this LU we search for it and
- # remove it from its current node. In the future we could fix this by:
- # - making a tasklet to search (share-lock all), then create the new one,
- # then one to remove, after
- # - removing the removal operation altogether
- self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+ # Lock all nodes for local exports
+ if self.export_mode == constants.EXPORT_MODE_LOCAL:
+ # FIXME: lock only instance primary and destination node
+ #
+ # Sad but true, for now we have do lock all nodes, as we don't know where
+ # the previous export might be, and in this LU we search for it and
+ # remove it from its current node. In the future we could fix this by:
+ # - making a tasklet to search (share-lock all), then create the new one,
+ # then one to remove, after
+ # - removing the removal operation altogether
+ self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
def DeclareLocks(self, level):
"""Last minute lock declaration."""
"""
env = {
+ "EXPORT_MODE": self.export_mode,
"EXPORT_NODE": self.op.target_node,
"EXPORT_DO_SHUTDOWN": self.op.shutdown,
"SHUTDOWN_TIMEOUT": self.shutdown_timeout,
+ # TODO: Generic function for boolean env variables
+ "REMOVE_INSTANCE": str(bool(self.remove_instance)),
}
+
env.update(_BuildInstanceHookEnvByObject(self, self.instance))
- nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
- self.op.target_node]
+
+ nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
+
+ if self.export_mode == constants.EXPORT_MODE_LOCAL:
+ nl.append(self.op.target_node)
+
return env, nl, nl
def CheckPrereq(self):
"""
instance_name = self.op.instance_name
+
self.instance = self.cfg.GetInstanceInfo(instance_name)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
_CheckNodeOnline(self, self.instance.primary_node)
- self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
- self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
- assert self.dst_node is not None
-
- _CheckNodeOnline(self, self.dst_node.name)
- _CheckNodeNotDrained(self, self.dst_node.name)
-
- # instance disk type verification
- for disk in self.instance.disks:
- if disk.dev_type == constants.LD_FILE:
- raise errors.OpPrereqError("Export not supported for instances with"
- " file-based disks", errors.ECODE_INVAL)
-
- def _CreateSnapshots(self, feedback_fn):
- """Creates an LVM snapshot for every disk of the instance.
+ if self.export_mode == constants.EXPORT_MODE_LOCAL:
+ self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+ self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
+ assert self.dst_node is not None
- @return: List of snapshots as L{objects.Disk} instances
+ _CheckNodeOnline(self, self.dst_node.name)
+ _CheckNodeNotDrained(self, self.dst_node.name)
- """
- instance = self.instance
- src_node = instance.primary_node
+ self._cds = None
+ self.dest_disk_info = None
+ self.dest_x509_ca = None
- vgname = self.cfg.GetVGName()
+ elif self.export_mode == constants.EXPORT_MODE_REMOTE:
+ self.dst_node = None
- snap_disks = []
+ if len(self.op.target_node) != len(self.instance.disks):
+ raise errors.OpPrereqError(("Received destination information for %s"
+ " disks, but instance %s has %s disks") %
+ (len(self.op.target_node), instance_name,
+ len(self.instance.disks)),
+ errors.ECODE_INVAL)
- for idx, disk in enumerate(instance.disks):
- feedback_fn("Creating a snapshot of disk/%s on node %s" %
- (idx, src_node))
+ cds = _GetClusterDomainSecret()
- # result.payload will be a snapshot of an lvm leaf of the one we
- # passed
- result = self.rpc.call_blockdev_snapshot(src_node, disk)
- msg = result.fail_msg
- if msg:
- self.LogWarning("Could not snapshot disk/%s on node %s: %s",
- idx, src_node, msg)
- snap_disks.append(False)
- else:
- disk_id = (vgname, result.payload)
- new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
- logical_id=disk_id, physical_id=disk_id,
- iv_name=disk.iv_name)
- snap_disks.append(new_dev)
+ # Check X509 key name
+ try:
+ (key_name, hmac_digest, hmac_salt) = self.x509_key_name
+ except (TypeError, ValueError), err:
+ raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
- return snap_disks
+ if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
+ raise errors.OpPrereqError("HMAC for X509 key name is wrong",
+ errors.ECODE_INVAL)
- def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index):
- """Removes an LVM snapshot.
+ # Load and verify CA
+ try:
+ (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
+ except OpenSSL.crypto.Error, err:
+ raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
+ (err, ), errors.ECODE_INVAL)
+
+ (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
+ if errcode is not None:
+ raise errors.OpPrereqError("Invalid destination X509 CA (%s)" % (msg, ),
+ errors.ECODE_INVAL)
- @type snap_disks: list
- @param snap_disks: The list of all snapshots as returned by
- L{_CreateSnapshots}
- @type disk_index: number
- @param disk_index: Index of the snapshot to be removed
- @rtype: bool
- @return: Whether removal was successful or not
+ self.dest_x509_ca = cert
- """
- disk = snap_disks[disk_index]
- if disk:
- src_node = self.instance.primary_node
+ # Verify target information
+ disk_info = []
+ for idx, disk_data in enumerate(self.op.target_node):
+ try:
+ (host, port, magic) = \
+ masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
+ except errors.GenericError, err:
+ raise errors.OpPrereqError("Target info for disk %s: %s" % (idx, err),
+ errors.ECODE_INVAL)
- feedback_fn("Removing snapshot of disk/%s on node %s" %
- (disk_index, src_node))
+ disk_info.append((host, port, magic))
- result = self.rpc.call_blockdev_remove(src_node, disk)
- if not result.fail_msg:
- return True
+ assert len(disk_info) == len(self.op.target_node)
+ self.dest_disk_info = disk_info
- self.LogWarning("Could not remove snapshot for disk/%d from node"
- " %s: %s", disk_index, src_node, result.fail_msg)
+ else:
+ raise errors.ProgrammerError("Unhandled export mode %r" %
+ self.export_mode)
- return False
+ # instance disk type verification
+ # TODO: Implement export support for file-based disks
+ for disk in self.instance.disks:
+ if disk.dev_type == constants.LD_FILE:
+ raise errors.OpPrereqError("Export not supported for instances with"
+ " file-based disks", errors.ECODE_INVAL)
def _CleanupExports(self, feedback_fn):
"""Removes exports of current instance from all other nodes.
exports will be removed from the nodes A, B and D.
"""
+ assert self.export_mode != constants.EXPORT_MODE_REMOTE
+
nodelist = self.cfg.GetNodeList()
nodelist.remove(self.dst_node.name)
"""Export an instance to an image in the cluster.
"""
+ assert self.export_mode in constants.EXPORT_MODES
+
instance = self.instance
- dst_node = self.dst_node
src_node = instance.primary_node
if self.op.shutdown:
feedback_fn("Shutting down instance %s" % instance.name)
result = self.rpc.call_instance_shutdown(src_node, instance,
self.shutdown_timeout)
+ # TODO: Maybe ignore failures if ignore_remove_failures is set
result.Raise("Could not shutdown instance %s on"
" node %s" % (instance.name, src_node))
_StartInstanceDisks(self, instance, None)
try:
- # per-disk results
- dresults = []
- removed_snaps = [False] * len(instance.disks)
+ helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
+ instance)
- snap_disks = None
+ helper.CreateSnapshots()
try:
- try:
- snap_disks = self._CreateSnapshots(feedback_fn)
- finally:
- if self.op.shutdown and instance.admin_up:
- feedback_fn("Starting instance %s" % instance.name)
- result = self.rpc.call_instance_start(src_node, instance,
- None, None)
- msg = result.fail_msg
- if msg:
- _ShutdownInstanceDisks(self, instance)
- raise errors.OpExecError("Could not start instance: %s" % msg)
-
- assert len(snap_disks) == len(instance.disks)
- assert len(removed_snaps) == len(instance.disks)
-
- # TODO: check for size
-
- cluster_name = self.cfg.GetClusterName()
- for idx, dev in enumerate(snap_disks):
- feedback_fn("Exporting snapshot %s from %s to %s" %
- (idx, src_node, dst_node.name))
- if dev:
- # FIXME: pass debug from opcode to backend
- result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
- instance, cluster_name,
- idx, self.op.debug_level)
- msg = result.fail_msg
- if msg:
- self.LogWarning("Could not export disk/%s from node %s to"
- " node %s: %s", idx, src_node, dst_node.name, msg)
- dresults.append(False)
- else:
- dresults.append(True)
+ if (self.op.shutdown and instance.admin_up and
+ not self.remove_instance):
+ assert not activate_disks
+ feedback_fn("Starting instance %s" % instance.name)
+ result = self.rpc.call_instance_start(src_node, instance, None, None)
+ msg = result.fail_msg
+ if msg:
+ feedback_fn("Failed to start instance: %s" % msg)
+ _ShutdownInstanceDisks(self, instance)
+ raise errors.OpExecError("Could not start instance: %s" % msg)
- # Remove snapshot
- if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
- removed_snaps[idx] = True
- else:
- dresults.append(False)
+ if self.export_mode == constants.EXPORT_MODE_LOCAL:
+ (fin_resu, dresults) = helper.LocalExport(self.dst_node)
+ elif self.export_mode == constants.EXPORT_MODE_REMOTE:
+ connect_timeout = constants.RIE_CONNECT_TIMEOUT
+ timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
- assert len(dresults) == len(instance.disks)
+ (key_name, _, _) = self.x509_key_name
- # Check for backwards compatibility
- assert compat.all(isinstance(i, bool) for i in dresults), \
- "Not all results are boolean: %r" % dresults
-
- feedback_fn("Finalizing export on %s" % dst_node.name)
- result = self.rpc.call_finalize_export(dst_node.name, instance,
- snap_disks)
- msg = result.fail_msg
- fin_resu = not msg
- if msg:
- self.LogWarning("Could not finalize export for instance %s"
- " on node %s: %s", instance.name, dst_node.name, msg)
+ dest_ca_pem = \
+ OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ self.dest_x509_ca)
+ (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
+ key_name, dest_ca_pem,
+ timeouts)
finally:
- # Remove all snapshots
- assert len(removed_snaps) == len(instance.disks)
- for idx, removed in enumerate(removed_snaps):
- if not removed:
- self._RemoveSnapshot(feedback_fn, snap_disks, idx)
+ helper.Cleanup()
+
+ # Check for backwards compatibility
+ assert len(dresults) == len(instance.disks)
+ assert compat.all(isinstance(i, bool) for i in dresults), \
+ "Not all results are boolean: %r" % dresults
finally:
if activate_disks:
feedback_fn("Deactivating disks for %s" % instance.name)
_ShutdownInstanceDisks(self, instance)
- self._CleanupExports(feedback_fn)
+ # Remove instance if requested
+ if self.remove_instance:
+ if not (compat.all(dresults) and fin_resu):
+ feedback_fn("Not removing instance %s as parts of the export failed" %
+ instance.name)
+ else:
+ feedback_fn("Removing instance %s" % instance.name)
+ _RemoveInstance(self, feedback_fn, instance,
+ self.ignore_remove_failures)
+
+ if self.export_mode == constants.EXPORT_MODE_LOCAL:
+ self._CleanupExports(feedback_fn)
return fin_resu, dresults