4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement CheckPrereq which also fills in the opcode instance
52 with all the fields (even if as None)
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements (REQ_CLUSTER,
57 REQ_MASTER); note that all commands require root permissions
66 def __init__(self, processor, op, cfg, sstore):
67 """Constructor for LogicalUnit.
69 This needs to be overriden in derived classes in order to check op
79 for attr_name in self._OP_REQP:
80 attr_val = getattr(op, attr_name, None)
82 raise errors.OpPrereqError("Required parameter '%s' missing" %
85 if not cfg.IsCluster():
86 raise errors.OpPrereqError("Cluster not initialized yet,"
87 " use 'gnt-cluster init' first.")
89 master = sstore.GetMasterNode()
90 if master != utils.HostInfo().name:
91 raise errors.OpPrereqError("Commands must be run on the master"
95 """Returns the SshRunner object
99 self.__ssh = ssh.SshRunner(self.sstore)
102 ssh = property(fget=__GetSSH)
104 def CheckPrereq(self):
105 """Check prerequisites for this LU.
107 This method should check that the prerequisites for the execution
108 of this LU are fulfilled. It can do internode communication, but
109 it should be idempotent - no cluster or system changes are
112 The method should raise errors.OpPrereqError in case something is
113 not fulfilled. Its return value is ignored.
115 This method should also update all the parameters of the opcode to
116 their canonical form; e.g. a short node name must be fully
117 expanded after this method has successfully completed (so that
118 hooks, logging, etc. work correctly).
121 raise NotImplementedError
123 def Exec(self, feedback_fn):
126 This method should implement the actual work. It should raise
127 errors.OpExecError for failures that are somewhat dealt with in
131 raise NotImplementedError
133 def BuildHooksEnv(self):
134 """Build hooks environment for this LU.
136 This method should return a three-node tuple consisting of: a dict
137 containing the environment that will be used for running the
138 specific hook for this LU, a list of node names on which the hook
139 should run before the execution, and a list of node names on which
140 the hook should run after the execution.
142 The keys of the dict must not have 'GANETI_' prefixed as this will
143 be handled in the hooks runner. Also note additional keys will be
144 added by the hooks runner. If the LU doesn't define any
145 environment, an empty dict (and not None) should be returned.
147 As for the node lists, the master should not be included in the
148 them, as it will be added by the hooks runner in case this LU
149 requires a cluster to run on (otherwise we don't have a node
150 list). No nodes should be returned as an empty list (and not
153 Note that if the HPATH for a LU class is None, this function will
157 raise NotImplementedError
160 class NoHooksLU(LogicalUnit):
161 """Simple LU which runs no hooks.
163 This LU is intended as a parent for other LogicalUnits which will
164 run no hooks, in order to reduce duplicate code.
170 def BuildHooksEnv(self):
173 This is a no-op, since we don't run hooks.
179 def _AddHostToEtcHosts(hostname):
180 """Wrapper around utils.SetEtcHostsEntry.
183 hi = utils.HostInfo(name=hostname)
184 utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
187 def _RemoveHostFromEtcHosts(hostname):
188 """Wrapper around utils.RemoveEtcHostsEntry.
191 hi = utils.HostInfo(name=hostname)
192 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
193 utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
196 def _GetWantedNodes(lu, nodes):
197 """Returns list of checked and expanded node names.
200 nodes: List of nodes (strings) or None for all
203 if not isinstance(nodes, list):
204 raise errors.OpPrereqError("Invalid argument type 'nodes'")
210 node = lu.cfg.ExpandNodeName(name)
212 raise errors.OpPrereqError("No such node name '%s'" % name)
216 wanted = lu.cfg.GetNodeList()
217 return utils.NiceSort(wanted)
220 def _GetWantedInstances(lu, instances):
221 """Returns list of checked and expanded instance names.
224 instances: List of instances (strings) or None for all
227 if not isinstance(instances, list):
228 raise errors.OpPrereqError("Invalid argument type 'instances'")
233 for name in instances:
234 instance = lu.cfg.ExpandInstanceName(name)
236 raise errors.OpPrereqError("No such instance name '%s'" % name)
237 wanted.append(instance)
240 wanted = lu.cfg.GetInstanceList()
241 return utils.NiceSort(wanted)
244 def _CheckOutputFields(static, dynamic, selected):
245 """Checks whether all selected fields are valid.
248 static: Static fields
249 dynamic: Dynamic fields
252 static_fields = frozenset(static)
253 dynamic_fields = frozenset(dynamic)
255 all_fields = static_fields | dynamic_fields
257 if not all_fields.issuperset(selected):
258 raise errors.OpPrereqError("Unknown output fields selected: %s"
259 % ",".join(frozenset(selected).
260 difference(all_fields)))
263 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
264 memory, vcpus, nics):
265 """Builds instance related env variables for hooks from single variables.
268 secondary_nodes: List of secondary nodes as strings
272 "INSTANCE_NAME": name,
273 "INSTANCE_PRIMARY": primary_node,
274 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
275 "INSTANCE_OS_TYPE": os_type,
276 "INSTANCE_STATUS": status,
277 "INSTANCE_MEMORY": memory,
278 "INSTANCE_VCPUS": vcpus,
282 nic_count = len(nics)
283 for idx, (ip, bridge, mac) in enumerate(nics):
286 env["INSTANCE_NIC%d_IP" % idx] = ip
287 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
288 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
292 env["INSTANCE_NIC_COUNT"] = nic_count
297 def _BuildInstanceHookEnvByObject(instance, override=None):
298 """Builds instance related env variables for hooks from an object.
301 instance: objects.Instance object of instance
302 override: dict of values to override
305 'name': instance.name,
306 'primary_node': instance.primary_node,
307 'secondary_nodes': instance.secondary_nodes,
308 'os_type': instance.os,
309 'status': instance.os,
310 'memory': instance.memory,
311 'vcpus': instance.vcpus,
312 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
315 args.update(override)
316 return _BuildInstanceHookEnv(**args)
319 def _HasValidVG(vglist, vgname):
320 """Checks if the volume group list is valid.
322 A non-None return value means there's an error, and the return value
323 is the error message.
326 vgsize = vglist.get(vgname, None)
328 return "volume group '%s' missing" % vgname
330 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
335 def _InitSSHSetup(node):
336 """Setup the SSH configuration for the cluster.
339 This generates a dsa keypair for root, adds the pub key to the
340 permitted hosts and adds the hostkey to its own known hosts.
343 node: the name of this host as a fqdn
346 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
348 for name in priv_key, pub_key:
349 if os.path.exists(name):
350 utils.CreateBackup(name)
351 utils.RemoveFile(name)
353 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
357 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
360 f = open(pub_key, 'r')
362 utils.AddAuthorizedKey(auth_keys, f.read(8192))
367 def _InitGanetiServerSetup(ss):
368 """Setup the necessary configuration for the initial node daemon.
370 This creates the nodepass file containing the shared password for
371 the cluster and also generates the SSL certificate.
374 # Create pseudo random password
375 randpass = sha.new(os.urandom(64)).hexdigest()
376 # and write it into sstore
377 ss.SetKey(ss.SS_NODED_PASS, randpass)
379 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
380 "-days", str(365*5), "-nodes", "-x509",
381 "-keyout", constants.SSL_CERT_FILE,
382 "-out", constants.SSL_CERT_FILE, "-batch"])
384 raise errors.OpExecError("could not generate server ssl cert, command"
385 " %s had exitcode %s and error message %s" %
386 (result.cmd, result.exit_code, result.output))
388 os.chmod(constants.SSL_CERT_FILE, 0400)
390 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
393 raise errors.OpExecError("Could not start the node daemon, command %s"
394 " had exitcode %s and error %s" %
395 (result.cmd, result.exit_code, result.output))
398 def _CheckInstanceBridgesExist(instance):
399 """Check that the brigdes needed by an instance exist.
402 # check bridges existance
403 brlist = [nic.bridge for nic in instance.nics]
404 if not rpc.call_bridges_exist(instance.primary_node, brlist):
405 raise errors.OpPrereqError("one or more target bridges %s does not"
406 " exist on destination node '%s'" %
407 (brlist, instance.primary_node))
410 class LUInitCluster(LogicalUnit):
411 """Initialise the cluster.
414 HPATH = "cluster-init"
415 HTYPE = constants.HTYPE_CLUSTER
416 _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
417 "def_bridge", "master_netdev", "file_storage_dir"]
420 def BuildHooksEnv(self):
423 Notes: Since we don't require a cluster, we must manually add
424 ourselves in the post-run node list.
427 env = {"OP_TARGET": self.op.cluster_name}
428 return env, [], [self.hostname.name]
430 def CheckPrereq(self):
431 """Verify that the passed name is a valid one.
434 if config.ConfigWriter.IsCluster():
435 raise errors.OpPrereqError("Cluster is already initialised")
437 if self.op.hypervisor_type == constants.HT_XEN_HVM31:
438 if not os.path.exists(constants.VNC_PASSWORD_FILE):
439 raise errors.OpPrereqError("Please prepare the cluster VNC"
441 constants.VNC_PASSWORD_FILE)
443 self.hostname = hostname = utils.HostInfo()
445 if hostname.ip.startswith("127."):
446 raise errors.OpPrereqError("This host's IP resolves to the private"
447 " range (%s). Please fix DNS or %s." %
448 (hostname.ip, constants.ETC_HOSTS))
450 if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
451 source=constants.LOCALHOST_IP_ADDRESS):
452 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
453 " to %s,\nbut this ip address does not"
454 " belong to this host."
455 " Aborting." % hostname.ip)
457 self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
459 if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
461 raise errors.OpPrereqError("Cluster IP already active. Aborting.")
463 secondary_ip = getattr(self.op, "secondary_ip", None)
464 if secondary_ip and not utils.IsValidIP(secondary_ip):
465 raise errors.OpPrereqError("Invalid secondary ip given")
467 secondary_ip != hostname.ip and
468 (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
469 source=constants.LOCALHOST_IP_ADDRESS))):
470 raise errors.OpPrereqError("You gave %s as secondary IP,"
471 " but it does not belong to this host." %
473 self.secondary_ip = secondary_ip
475 if not hasattr(self.op, "vg_name"):
476 self.op.vg_name = None
477 # if vg_name not None, checks if volume group is valid
479 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
481 raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
482 " you are not using lvm" % vgstatus)
484 self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
486 if not os.path.isabs(self.op.file_storage_dir):
487 raise errors.OpPrereqError("The file storage directory you have is"
488 " not an absolute path.")
490 if not os.path.exists(self.op.file_storage_dir):
492 os.makedirs(self.op.file_storage_dir, 0750)
494 raise errors.OpPrereqError("Cannot create file storage directory"
496 (self.op.file_storage_dir, err))
498 if not os.path.isdir(self.op.file_storage_dir):
499 raise errors.OpPrereqError("The file storage directory '%s' is not"
500 " a directory." % self.op.file_storage_dir)
502 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
504 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
507 if self.op.hypervisor_type not in constants.HYPER_TYPES:
508 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
509 self.op.hypervisor_type)
511 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
513 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
514 (self.op.master_netdev,
515 result.output.strip()))
517 if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
518 os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
519 raise errors.OpPrereqError("Init.d script '%s' missing or not"
520 " executable." % constants.NODE_INITD_SCRIPT)
522 def Exec(self, feedback_fn):
523 """Initialize the cluster.
526 clustername = self.clustername
527 hostname = self.hostname
529 # set up the simple store
530 self.sstore = ss = ssconf.SimpleStore()
531 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
532 ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
533 ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
534 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
535 ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
536 ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
538 # set up the inter-node password and certificate
539 _InitGanetiServerSetup(ss)
541 # start the master ip
542 rpc.call_node_start_master(hostname.name)
544 # set up ssh config and /etc/hosts
545 f = open(constants.SSH_HOST_RSA_PUB, 'r')
550 sshkey = sshline.split(" ")[1]
552 _AddHostToEtcHosts(hostname.name)
553 _InitSSHSetup(hostname.name)
555 # init of cluster config file
556 self.cfg = cfgw = config.ConfigWriter()
557 cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
558 sshkey, self.op.mac_prefix,
559 self.op.vg_name, self.op.def_bridge)
561 ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
564 class LUDestroyCluster(NoHooksLU):
565 """Logical unit for destroying the cluster.
570 def CheckPrereq(self):
571 """Check prerequisites.
573 This checks whether the cluster is empty.
575 Any errors are signalled by raising errors.OpPrereqError.
578 master = self.sstore.GetMasterNode()
580 nodelist = self.cfg.GetNodeList()
581 if len(nodelist) != 1 or nodelist[0] != master:
582 raise errors.OpPrereqError("There are still %d node(s) in"
583 " this cluster." % (len(nodelist) - 1))
584 instancelist = self.cfg.GetInstanceList()
586 raise errors.OpPrereqError("There are still %d instance(s) in"
587 " this cluster." % len(instancelist))
589 def Exec(self, feedback_fn):
590 """Destroys the cluster.
593 master = self.sstore.GetMasterNode()
594 if not rpc.call_node_stop_master(master):
595 raise errors.OpExecError("Could not disable the master role")
596 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
597 utils.CreateBackup(priv_key)
598 utils.CreateBackup(pub_key)
599 rpc.call_node_leave_cluster(master)
602 class LUVerifyCluster(NoHooksLU):
603 """Verifies the cluster status.
608 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
609 remote_version, feedback_fn):
610 """Run multiple tests against a node.
613 - compares ganeti version
614 - checks vg existance and size > 20G
615 - checks config file checksum
616 - checks ssh to other nodes
619 node: name of the node to check
620 file_list: required list of files
621 local_cksum: dictionary of local files and their checksums
624 # compares ganeti version
625 local_version = constants.PROTOCOL_VERSION
626 if not remote_version:
627 feedback_fn(" - ERROR: connection to %s failed" % (node))
630 if local_version != remote_version:
631 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
632 (local_version, node, remote_version))
635 # checks vg existance and size > 20G
639 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
643 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
645 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
648 # checks config file checksum
651 if 'filelist' not in node_result:
653 feedback_fn(" - ERROR: node hasn't returned file checksum data")
655 remote_cksum = node_result['filelist']
656 for file_name in file_list:
657 if file_name not in remote_cksum:
659 feedback_fn(" - ERROR: file '%s' missing" % file_name)
660 elif remote_cksum[file_name] != local_cksum[file_name]:
662 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
664 if 'nodelist' not in node_result:
666 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
668 if node_result['nodelist']:
670 for node in node_result['nodelist']:
671 feedback_fn(" - ERROR: communication with node '%s': %s" %
672 (node, node_result['nodelist'][node]))
673 hyp_result = node_result.get('hypervisor', None)
674 if hyp_result is not None:
675 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
678 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
679 """Verify an instance.
681 This function checks to see if the required block devices are
682 available on the instance's node.
687 instancelist = self.cfg.GetInstanceList()
688 if not instance in instancelist:
689 feedback_fn(" - ERROR: instance %s not in instance list %s" %
690 (instance, instancelist))
693 instanceconfig = self.cfg.GetInstanceInfo(instance)
694 node_current = instanceconfig.primary_node
697 instanceconfig.MapLVsByNode(node_vol_should)
699 for node in node_vol_should:
700 for volume in node_vol_should[node]:
701 if node not in node_vol_is or volume not in node_vol_is[node]:
702 feedback_fn(" - ERROR: volume %s missing on node %s" %
706 if not instanceconfig.status == 'down':
707 if not instance in node_instance[node_current]:
708 feedback_fn(" - ERROR: instance %s not running on node %s" %
709 (instance, node_current))
712 for node in node_instance:
713 if (not node == node_current):
714 if instance in node_instance[node]:
715 feedback_fn(" - ERROR: instance %s should not run on node %s" %
721 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
722 """Verify if there are any unknown volumes in the cluster.
724 The .os, .swap and backup volumes are ignored. All other volumes are
730 for node in node_vol_is:
731 for volume in node_vol_is[node]:
732 if node not in node_vol_should or volume not in node_vol_should[node]:
733 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
738 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
739 """Verify the list of running instances.
741 This checks what instances are running but unknown to the cluster.
745 for node in node_instance:
746 for runninginstance in node_instance[node]:
747 if runninginstance not in instancelist:
748 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
749 (runninginstance, node))
753 def CheckPrereq(self):
754 """Check prerequisites.
756 This has no prerequisites.
761 def Exec(self, feedback_fn):
762 """Verify integrity of cluster, performing various test on nodes.
766 feedback_fn("* Verifying global settings")
767 for msg in self.cfg.VerifyConfig():
768 feedback_fn(" - ERROR: %s" % msg)
770 vg_name = self.cfg.GetVGName()
771 nodelist = utils.NiceSort(self.cfg.GetNodeList())
772 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
776 # FIXME: verify OS list
778 file_names = list(self.sstore.GetFileList())
779 file_names.append(constants.SSL_CERT_FILE)
780 file_names.append(constants.CLUSTER_CONF_FILE)
781 local_checksums = utils.FingerprintFiles(file_names)
783 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
784 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
785 all_instanceinfo = rpc.call_instance_list(nodelist)
786 all_vglist = rpc.call_vg_list(nodelist)
787 node_verify_param = {
788 'filelist': file_names,
789 'nodelist': nodelist,
792 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
793 all_rversion = rpc.call_version(nodelist)
795 for node in nodelist:
796 feedback_fn("* Verifying node %s" % node)
797 result = self._VerifyNode(node, file_names, local_checksums,
798 all_vglist[node], all_nvinfo[node],
799 all_rversion[node], feedback_fn)
803 volumeinfo = all_volumeinfo[node]
805 if isinstance(volumeinfo, basestring):
806 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
807 (node, volumeinfo[-400:].encode('string_escape')))
809 node_volume[node] = {}
810 elif not isinstance(volumeinfo, dict):
811 feedback_fn(" - ERROR: connection to %s failed" % (node,))
815 node_volume[node] = volumeinfo
818 nodeinstance = all_instanceinfo[node]
819 if type(nodeinstance) != list:
820 feedback_fn(" - ERROR: connection to %s failed" % (node,))
824 node_instance[node] = nodeinstance
828 for instance in instancelist:
829 feedback_fn("* Verifying instance %s" % instance)
830 result = self._VerifyInstance(instance, node_volume, node_instance,
834 inst_config = self.cfg.GetInstanceInfo(instance)
836 inst_config.MapLVsByNode(node_vol_should)
838 feedback_fn("* Verifying orphan volumes")
839 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
843 feedback_fn("* Verifying remaining instances")
844 result = self._VerifyOrphanInstances(instancelist, node_instance,
851 class LUVerifyDisks(NoHooksLU):
852 """Verifies the cluster disks status.
857 def CheckPrereq(self):
858 """Check prerequisites.
860 This has no prerequisites.
865 def Exec(self, feedback_fn):
866 """Verify integrity of cluster disks.
869 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
871 vg_name = self.cfg.GetVGName()
872 nodes = utils.NiceSort(self.cfg.GetNodeList())
873 instances = [self.cfg.GetInstanceInfo(name)
874 for name in self.cfg.GetInstanceList()]
877 for inst in instances:
879 if (inst.status != "up" or
880 inst.disk_template not in constants.DTS_NET_MIRROR):
882 inst.MapLVsByNode(inst_lvs)
883 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
884 for node, vol_list in inst_lvs.iteritems():
886 nv_dict[(node, vol)] = inst
891 node_lvs = rpc.call_volume_list(nodes, vg_name)
898 if isinstance(lvs, basestring):
899 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
901 elif not isinstance(lvs, dict):
902 logger.Info("connection to node %s failed or invalid data returned" %
904 res_nodes.append(node)
907 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
908 inst = nv_dict.pop((node, lv_name), None)
909 if (not lv_online and inst is not None
910 and inst.name not in res_instances):
911 res_instances.append(inst.name)
913 # any leftover items in nv_dict are missing LVs, let's arrange the
915 for key, inst in nv_dict.iteritems():
916 if inst.name not in res_missing:
917 res_missing[inst.name] = []
918 res_missing[inst.name].append(key)
923 class LURenameCluster(LogicalUnit):
924 """Rename the cluster.
927 HPATH = "cluster-rename"
928 HTYPE = constants.HTYPE_CLUSTER
931 def BuildHooksEnv(self):
936 "OP_TARGET": self.sstore.GetClusterName(),
937 "NEW_NAME": self.op.name,
939 mn = self.sstore.GetMasterNode()
940 return env, [mn], [mn]
942 def CheckPrereq(self):
943 """Verify that the passed name is a valid one.
946 hostname = utils.HostInfo(self.op.name)
948 new_name = hostname.name
949 self.ip = new_ip = hostname.ip
950 old_name = self.sstore.GetClusterName()
951 old_ip = self.sstore.GetMasterIP()
952 if new_name == old_name and new_ip == old_ip:
953 raise errors.OpPrereqError("Neither the name nor the IP address of the"
954 " cluster has changed")
956 result = utils.RunCmd(["fping", "-q", new_ip])
957 if not result.failed:
958 raise errors.OpPrereqError("The given cluster IP address (%s) is"
959 " reachable on the network. Aborting." %
962 self.op.name = new_name
964 def Exec(self, feedback_fn):
965 """Rename the cluster.
968 clustername = self.op.name
972 # shutdown the master IP
973 master = ss.GetMasterNode()
974 if not rpc.call_node_stop_master(master):
975 raise errors.OpExecError("Could not disable the master role")
979 ss.SetKey(ss.SS_MASTER_IP, ip)
980 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
982 # Distribute updated ss config to all nodes
983 myself = self.cfg.GetNodeInfo(master)
984 dist_nodes = self.cfg.GetNodeList()
985 if myself.name in dist_nodes:
986 dist_nodes.remove(myself.name)
988 logger.Debug("Copying updated ssconf data to all nodes")
989 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
990 fname = ss.KeyToFilename(keyname)
991 result = rpc.call_upload_file(dist_nodes, fname)
992 for to_node in dist_nodes:
993 if not result[to_node]:
994 logger.Error("copy of file %s to node %s failed" %
997 if not rpc.call_node_start_master(master):
998 logger.Error("Could not re-enable the master role on the master,"
999 " please restart manually.")
1002 def _RecursiveCheckIfLVMBased(disk):
1003 """Check if the given disk or its children are lvm-based.
1006 disk: ganeti.objects.Disk object
1009 boolean indicating whether a LD_LV dev_type was found or not
1013 for chdisk in disk.children:
1014 if _RecursiveCheckIfLVMBased(chdisk):
1016 return disk.dev_type == constants.LD_LV
1019 class LUSetClusterParams(LogicalUnit):
1020 """Change the parameters of the cluster.
1023 HPATH = "cluster-modify"
1024 HTYPE = constants.HTYPE_CLUSTER
1027 def BuildHooksEnv(self):
1032 "OP_TARGET": self.sstore.GetClusterName(),
1033 "NEW_VG_NAME": self.op.vg_name,
1035 mn = self.sstore.GetMasterNode()
1036 return env, [mn], [mn]
1038 def CheckPrereq(self):
1039 """Check prerequisites.
1041 This checks whether the given params don't conflict and
1042 if the given volume group is valid.
1045 if not self.op.vg_name:
1046 instances = [self.cfg.GetInstanceInfo(name)
1047 for name in self.cfg.GetInstanceList()]
1048 for inst in instances:
1049 for disk in inst.disks:
1050 if _RecursiveCheckIfLVMBased(disk):
1051 raise errors.OpPrereqError("Cannot disable lvm storage while"
1052 " lvm-based instances exist")
1054 # if vg_name not None, checks given volume group on all nodes
1056 node_list = self.cfg.GetNodeList()
1057 vglist = rpc.call_vg_list(node_list)
1058 for node in node_list:
1059 vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
1061 raise errors.OpPrereqError("Error on node '%s': %s" %
1064 def Exec(self, feedback_fn):
1065 """Change the parameters of the cluster.
1068 if self.op.vg_name != self.cfg.GetVGName():
1069 self.cfg.SetVGName(self.op.vg_name)
1071 feedback_fn("Cluster LVM configuration already in desired"
1072 " state, not changing")
1075 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1076 """Sleep and poll for an instance's disk to sync.
1079 if not instance.disks:
1083 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1085 node = instance.primary_node
1087 for dev in instance.disks:
1088 cfgw.SetDiskID(dev, node)
1094 cumul_degraded = False
1095 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1097 proc.LogWarning("Can't get any data from node %s" % node)
1100 raise errors.RemoteError("Can't contact node %s for mirror data,"
1101 " aborting." % node)
1105 for i in range(len(rstats)):
1108 proc.LogWarning("Can't compute data for node %s/%s" %
1109 (node, instance.disks[i].iv_name))
1111 # we ignore the ldisk parameter
1112 perc_done, est_time, is_degraded, _ = mstat
1113 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1114 if perc_done is not None:
1116 if est_time is not None:
1117 rem_time = "%d estimated seconds remaining" % est_time
1120 rem_time = "no time estimate"
1121 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1122 (instance.disks[i].iv_name, perc_done, rem_time))
1129 time.sleep(min(60, max_time))
1135 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1136 return not cumul_degraded
1139 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1140 """Check that mirrors are not degraded.
1142 The ldisk parameter, if True, will change the test from the
1143 is_degraded attribute (which represents overall non-ok status for
1144 the device(s)) to the ldisk (representing the local storage status).
1147 cfgw.SetDiskID(dev, node)
1154 if on_primary or dev.AssembleOnSecondary():
1155 rstats = rpc.call_blockdev_find(node, dev)
1157 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1160 result = result and (not rstats[idx])
1162 for child in dev.children:
1163 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1168 class LUDiagnoseOS(NoHooksLU):
1169 """Logical unit for OS diagnose/query.
1174 def CheckPrereq(self):
1175 """Check prerequisites.
1177 This always succeeds, since this is a pure query LU.
1182 def Exec(self, feedback_fn):
1183 """Compute the list of OSes.
1186 node_list = self.cfg.GetNodeList()
1187 node_data = rpc.call_os_diagnose(node_list)
1188 if node_data == False:
1189 raise errors.OpExecError("Can't gather the list of OSes")
1193 class LURemoveNode(LogicalUnit):
1194 """Logical unit for removing a node.
1197 HPATH = "node-remove"
1198 HTYPE = constants.HTYPE_NODE
1199 _OP_REQP = ["node_name"]
1201 def BuildHooksEnv(self):
1204 This doesn't run on the target node in the pre phase as a failed
1205 node would not allows itself to run.
1209 "OP_TARGET": self.op.node_name,
1210 "NODE_NAME": self.op.node_name,
1212 all_nodes = self.cfg.GetNodeList()
1213 all_nodes.remove(self.op.node_name)
1214 return env, all_nodes, all_nodes
1216 def CheckPrereq(self):
1217 """Check prerequisites.
1220 - the node exists in the configuration
1221 - it does not have primary or secondary instances
1222 - it's not the master
1224 Any errors are signalled by raising errors.OpPrereqError.
1227 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1229 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1231 instance_list = self.cfg.GetInstanceList()
1233 masternode = self.sstore.GetMasterNode()
1234 if node.name == masternode:
1235 raise errors.OpPrereqError("Node is the master node,"
1236 " you need to failover first.")
1238 for instance_name in instance_list:
1239 instance = self.cfg.GetInstanceInfo(instance_name)
1240 if node.name == instance.primary_node:
1241 raise errors.OpPrereqError("Instance %s still running on the node,"
1242 " please remove first." % instance_name)
1243 if node.name in instance.secondary_nodes:
1244 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1245 " please remove first." % instance_name)
1246 self.op.node_name = node.name
1249 def Exec(self, feedback_fn):
1250 """Removes the node from the cluster.
1254 logger.Info("stopping the node daemon and removing configs from node %s" %
1257 rpc.call_node_leave_cluster(node.name)
1259 self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1261 logger.Info("Removing node %s from config" % node.name)
1263 self.cfg.RemoveNode(node.name)
1265 _RemoveHostFromEtcHosts(node.name)
1268 class LUQueryNodes(NoHooksLU):
1269 """Logical unit for querying nodes.
1272 _OP_REQP = ["output_fields", "names"]
1274 def CheckPrereq(self):
1275 """Check prerequisites.
1277 This checks that the fields required are valid output fields.
1280 self.dynamic_fields = frozenset(["dtotal", "dfree",
1281 "mtotal", "mnode", "mfree",
1284 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1285 "pinst_list", "sinst_list",
1287 dynamic=self.dynamic_fields,
1288 selected=self.op.output_fields)
1290 self.wanted = _GetWantedNodes(self, self.op.names)
1292 def Exec(self, feedback_fn):
1293 """Computes the list of nodes and their attributes.
1296 nodenames = self.wanted
1297 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1299 # begin data gathering
1301 if self.dynamic_fields.intersection(self.op.output_fields):
1303 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1304 for name in nodenames:
1305 nodeinfo = node_data.get(name, None)
1308 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1309 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1310 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1311 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1312 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1313 "bootid": nodeinfo['bootid'],
1316 live_data[name] = {}
1318 live_data = dict.fromkeys(nodenames, {})
1320 node_to_primary = dict([(name, set()) for name in nodenames])
1321 node_to_secondary = dict([(name, set()) for name in nodenames])
1323 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1324 "sinst_cnt", "sinst_list"))
1325 if inst_fields & frozenset(self.op.output_fields):
1326 instancelist = self.cfg.GetInstanceList()
1328 for instance_name in instancelist:
1329 inst = self.cfg.GetInstanceInfo(instance_name)
1330 if inst.primary_node in node_to_primary:
1331 node_to_primary[inst.primary_node].add(inst.name)
1332 for secnode in inst.secondary_nodes:
1333 if secnode in node_to_secondary:
1334 node_to_secondary[secnode].add(inst.name)
1336 # end data gathering
1339 for node in nodelist:
1341 for field in self.op.output_fields:
1344 elif field == "pinst_list":
1345 val = list(node_to_primary[node.name])
1346 elif field == "sinst_list":
1347 val = list(node_to_secondary[node.name])
1348 elif field == "pinst_cnt":
1349 val = len(node_to_primary[node.name])
1350 elif field == "sinst_cnt":
1351 val = len(node_to_secondary[node.name])
1352 elif field == "pip":
1353 val = node.primary_ip
1354 elif field == "sip":
1355 val = node.secondary_ip
1356 elif field in self.dynamic_fields:
1357 val = live_data[node.name].get(field, None)
1359 raise errors.ParameterError(field)
1360 node_output.append(val)
1361 output.append(node_output)
1366 class LUQueryNodeVolumes(NoHooksLU):
1367 """Logical unit for getting volumes on node(s).
1370 _OP_REQP = ["nodes", "output_fields"]
1372 def CheckPrereq(self):
1373 """Check prerequisites.
1375 This checks that the fields required are valid output fields.
1378 self.nodes = _GetWantedNodes(self, self.op.nodes)
1380 _CheckOutputFields(static=["node"],
1381 dynamic=["phys", "vg", "name", "size", "instance"],
1382 selected=self.op.output_fields)
1385 def Exec(self, feedback_fn):
1386 """Computes the list of nodes and their attributes.
1389 nodenames = self.nodes
1390 volumes = rpc.call_node_volumes(nodenames)
1392 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1393 in self.cfg.GetInstanceList()]
1395 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1398 for node in nodenames:
1399 if node not in volumes or not volumes[node]:
1402 node_vols = volumes[node][:]
1403 node_vols.sort(key=lambda vol: vol['dev'])
1405 for vol in node_vols:
1407 for field in self.op.output_fields:
1410 elif field == "phys":
1414 elif field == "name":
1416 elif field == "size":
1417 val = int(float(vol['size']))
1418 elif field == "instance":
1420 if node not in lv_by_node[inst]:
1422 if vol['name'] in lv_by_node[inst][node]:
1428 raise errors.ParameterError(field)
1429 node_output.append(str(val))
1431 output.append(node_output)
1436 class LUAddNode(LogicalUnit):
1437 """Logical unit for adding node to the cluster.
1441 HTYPE = constants.HTYPE_NODE
1442 _OP_REQP = ["node_name"]
1444 def BuildHooksEnv(self):
1447 This will run on all nodes before, and on all nodes + the new node after.
1451 "OP_TARGET": self.op.node_name,
1452 "NODE_NAME": self.op.node_name,
1453 "NODE_PIP": self.op.primary_ip,
1454 "NODE_SIP": self.op.secondary_ip,
1456 nodes_0 = self.cfg.GetNodeList()
1457 nodes_1 = nodes_0 + [self.op.node_name, ]
1458 return env, nodes_0, nodes_1
1460 def CheckPrereq(self):
1461 """Check prerequisites.
1464 - the new node is not already in the config
1466 - its parameters (single/dual homed) matches the cluster
1468 Any errors are signalled by raising errors.OpPrereqError.
1471 node_name = self.op.node_name
1474 dns_data = utils.HostInfo(node_name)
1476 node = dns_data.name
1477 primary_ip = self.op.primary_ip = dns_data.ip
1478 secondary_ip = getattr(self.op, "secondary_ip", None)
1479 if secondary_ip is None:
1480 secondary_ip = primary_ip
1481 if not utils.IsValidIP(secondary_ip):
1482 raise errors.OpPrereqError("Invalid secondary IP given")
1483 self.op.secondary_ip = secondary_ip
1484 node_list = cfg.GetNodeList()
1485 if node in node_list:
1486 raise errors.OpPrereqError("Node %s is already in the configuration"
1489 for existing_node_name in node_list:
1490 existing_node = cfg.GetNodeInfo(existing_node_name)
1491 if (existing_node.primary_ip == primary_ip or
1492 existing_node.secondary_ip == primary_ip or
1493 existing_node.primary_ip == secondary_ip or
1494 existing_node.secondary_ip == secondary_ip):
1495 raise errors.OpPrereqError("New node ip address(es) conflict with"
1496 " existing node %s" % existing_node.name)
1498 # check that the type of the node (single versus dual homed) is the
1499 # same as for the master
1500 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1501 master_singlehomed = myself.secondary_ip == myself.primary_ip
1502 newbie_singlehomed = secondary_ip == primary_ip
1503 if master_singlehomed != newbie_singlehomed:
1504 if master_singlehomed:
1505 raise errors.OpPrereqError("The master has no private ip but the"
1506 " new node has one")
1508 raise errors.OpPrereqError("The master has a private ip but the"
1509 " new node doesn't have one")
1511 # checks reachablity
1512 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1513 raise errors.OpPrereqError("Node not reachable by ping")
1515 if not newbie_singlehomed:
1516 # check reachability from my secondary ip to newbie's secondary ip
1517 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1518 source=myself.secondary_ip):
1519 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1520 " based ping to noded port")
1522 self.new_node = objects.Node(name=node,
1523 primary_ip=primary_ip,
1524 secondary_ip=secondary_ip)
1526 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1527 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1528 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1529 constants.VNC_PASSWORD_FILE)
1531 def Exec(self, feedback_fn):
1532 """Adds the new node to the cluster.
1535 new_node = self.new_node
1536 node = new_node.name
1538 # set up inter-node password and certificate and restarts the node daemon
1539 gntpass = self.sstore.GetNodeDaemonPassword()
1540 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1541 raise errors.OpExecError("ganeti password corruption detected")
1542 f = open(constants.SSL_CERT_FILE)
1544 gntpem = f.read(8192)
1547 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1548 # so we use this to detect an invalid certificate; as long as the
1549 # cert doesn't contain this, the here-document will be correctly
1550 # parsed by the shell sequence below
1551 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1552 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1553 if not gntpem.endswith("\n"):
1554 raise errors.OpExecError("PEM must end with newline")
1555 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1557 # and then connect with ssh to set password and start ganeti-noded
1558 # note that all the below variables are sanitized at this point,
1559 # either by being constants or by the checks above
1561 mycommand = ("umask 077 && "
1562 "echo '%s' > '%s' && "
1563 "cat > '%s' << '!EOF.' && \n"
1564 "%s!EOF.\n%s restart" %
1565 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1566 constants.SSL_CERT_FILE, gntpem,
1567 constants.NODE_INITD_SCRIPT))
1569 result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1571 raise errors.OpExecError("Remote command on node %s, error: %s,"
1573 (node, result.fail_reason, result.output))
1575 # check connectivity
1578 result = rpc.call_version([node])[node]
1580 if constants.PROTOCOL_VERSION == result:
1581 logger.Info("communication to node %s fine, sw version %s match" %
1584 raise errors.OpExecError("Version mismatch master version %s,"
1585 " node version %s" %
1586 (constants.PROTOCOL_VERSION, result))
1588 raise errors.OpExecError("Cannot get version from the new node")
1591 logger.Info("copy ssh key to node %s" % node)
1592 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1594 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1595 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1601 keyarray.append(f.read())
1605 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1606 keyarray[3], keyarray[4], keyarray[5])
1609 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1611 # Add node to our /etc/hosts, and add key to known_hosts
1612 _AddHostToEtcHosts(new_node.name)
1614 if new_node.secondary_ip != new_node.primary_ip:
1615 if not rpc.call_node_tcp_ping(new_node.name,
1616 constants.LOCALHOST_IP_ADDRESS,
1617 new_node.secondary_ip,
1618 constants.DEFAULT_NODED_PORT,
1620 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1621 " you gave (%s). Please fix and re-run this"
1622 " command." % new_node.secondary_ip)
1624 success, msg = self.ssh.VerifyNodeHostname(node)
1626 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1627 " than the one the resolver gives: %s."
1628 " Please fix and re-run this command." %
1631 # Distribute updated /etc/hosts and known_hosts to all nodes,
1632 # including the node just added
1633 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1634 dist_nodes = self.cfg.GetNodeList() + [node]
1635 if myself.name in dist_nodes:
1636 dist_nodes.remove(myself.name)
1638 logger.Debug("Copying hosts and known_hosts to all nodes")
1639 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1640 result = rpc.call_upload_file(dist_nodes, fname)
1641 for to_node in dist_nodes:
1642 if not result[to_node]:
1643 logger.Error("copy of file %s to node %s failed" %
1646 to_copy = ss.GetFileList()
1647 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1648 to_copy.append(constants.VNC_PASSWORD_FILE)
1649 for fname in to_copy:
1650 if not self.ssh.CopyFileToNode(node, fname):
1651 logger.Error("could not copy file %s to node %s" % (fname, node))
1653 logger.Info("adding node %s to cluster.conf" % node)
1654 self.cfg.AddNode(new_node)
1657 class LUMasterFailover(LogicalUnit):
1658 """Failover the master node to the current node.
1660 This is a special LU in that it must run on a non-master node.
1663 HPATH = "master-failover"
1664 HTYPE = constants.HTYPE_CLUSTER
1668 def BuildHooksEnv(self):
1671 This will run on the new master only in the pre phase, and on all
1672 the nodes in the post phase.
1676 "OP_TARGET": self.new_master,
1677 "NEW_MASTER": self.new_master,
1678 "OLD_MASTER": self.old_master,
1680 return env, [self.new_master], self.cfg.GetNodeList()
1682 def CheckPrereq(self):
1683 """Check prerequisites.
1685 This checks that we are not already the master.
1688 self.new_master = utils.HostInfo().name
1689 self.old_master = self.sstore.GetMasterNode()
1691 if self.old_master == self.new_master:
1692 raise errors.OpPrereqError("This commands must be run on the node"
1693 " where you want the new master to be."
1694 " %s is already the master" %
1697 def Exec(self, feedback_fn):
1698 """Failover the master node.
1700 This command, when run on a non-master node, will cause the current
1701 master to cease being master, and the non-master to become new
1705 #TODO: do not rely on gethostname returning the FQDN
1706 logger.Info("setting master to %s, old master: %s" %
1707 (self.new_master, self.old_master))
1709 if not rpc.call_node_stop_master(self.old_master):
1710 logger.Error("could disable the master role on the old master"
1711 " %s, please disable manually" % self.old_master)
1714 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1715 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1716 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1717 logger.Error("could not distribute the new simple store master file"
1718 " to the other nodes, please check.")
1720 if not rpc.call_node_start_master(self.new_master):
1721 logger.Error("could not start the master role on the new master"
1722 " %s, please check" % self.new_master)
1723 feedback_fn("Error in activating the master IP on the new master,"
1724 " please fix manually.")
1728 class LUQueryClusterInfo(NoHooksLU):
1729 """Query cluster configuration.
1735 def CheckPrereq(self):
1736 """No prerequsites needed for this LU.
1741 def Exec(self, feedback_fn):
1742 """Return cluster config.
1746 "name": self.sstore.GetClusterName(),
1747 "software_version": constants.RELEASE_VERSION,
1748 "protocol_version": constants.PROTOCOL_VERSION,
1749 "config_version": constants.CONFIG_VERSION,
1750 "os_api_version": constants.OS_API_VERSION,
1751 "export_version": constants.EXPORT_VERSION,
1752 "master": self.sstore.GetMasterNode(),
1753 "architecture": (platform.architecture()[0], platform.machine()),
1759 class LUClusterCopyFile(NoHooksLU):
1760 """Copy file to cluster.
1763 _OP_REQP = ["nodes", "filename"]
1765 def CheckPrereq(self):
1766 """Check prerequisites.
1768 It should check that the named file exists and that the given list
1772 if not os.path.exists(self.op.filename):
1773 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1775 self.nodes = _GetWantedNodes(self, self.op.nodes)
1777 def Exec(self, feedback_fn):
1778 """Copy a file from master to some nodes.
1781 opts - class with options as members
1782 args - list containing a single element, the file name
1784 nodes - list containing the name of target nodes; if empty, all nodes
1787 filename = self.op.filename
1789 myname = utils.HostInfo().name
1791 for node in self.nodes:
1794 if not self.ssh.CopyFileToNode(node, filename):
1795 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1798 class LUDumpClusterConfig(NoHooksLU):
1799 """Return a text-representation of the cluster-config.
1804 def CheckPrereq(self):
1805 """No prerequisites.
1810 def Exec(self, feedback_fn):
1811 """Dump a representation of the cluster config to the standard output.
1814 return self.cfg.DumpConfig()
1817 class LURunClusterCommand(NoHooksLU):
1818 """Run a command on some nodes.
1821 _OP_REQP = ["command", "nodes"]
1823 def CheckPrereq(self):
1824 """Check prerequisites.
1826 It checks that the given list of nodes is valid.
1829 self.nodes = _GetWantedNodes(self, self.op.nodes)
1831 def Exec(self, feedback_fn):
1832 """Run a command on some nodes.
1835 # put the master at the end of the nodes list
1836 master_node = self.sstore.GetMasterNode()
1837 if master_node in self.nodes:
1838 self.nodes.remove(master_node)
1839 self.nodes.append(master_node)
1842 for node in self.nodes:
1843 result = self.ssh.Run(node, "root", self.op.command)
1844 data.append((node, result.output, result.exit_code))
1849 class LUActivateInstanceDisks(NoHooksLU):
1850 """Bring up an instance's disks.
1853 _OP_REQP = ["instance_name"]
1855 def CheckPrereq(self):
1856 """Check prerequisites.
1858 This checks that the instance is in the cluster.
1861 instance = self.cfg.GetInstanceInfo(
1862 self.cfg.ExpandInstanceName(self.op.instance_name))
1863 if instance is None:
1864 raise errors.OpPrereqError("Instance '%s' not known" %
1865 self.op.instance_name)
1866 self.instance = instance
1869 def Exec(self, feedback_fn):
1870 """Activate the disks.
1873 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1875 raise errors.OpExecError("Cannot activate block devices")
1880 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1881 """Prepare the block devices for an instance.
1883 This sets up the block devices on all nodes.
1886 instance: a ganeti.objects.Instance object
1887 ignore_secondaries: if true, errors on secondary nodes won't result
1888 in an error return from the function
1891 false if the operation failed
1892 list of (host, instance_visible_name, node_visible_name) if the operation
1893 suceeded with the mapping from node devices to instance devices
1897 iname = instance.name
1898 # With the two passes mechanism we try to reduce the window of
1899 # opportunity for the race condition of switching DRBD to primary
1900 # before handshaking occured, but we do not eliminate it
1902 # The proper fix would be to wait (with some limits) until the
1903 # connection has been made and drbd transitions from WFConnection
1904 # into any other network-connected state (Connected, SyncTarget,
1907 # 1st pass, assemble on all nodes in secondary mode
1908 for inst_disk in instance.disks:
1909 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1910 cfg.SetDiskID(node_disk, node)
1911 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1913 logger.Error("could not prepare block device %s on node %s"
1914 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1915 if not ignore_secondaries:
1918 # FIXME: race condition on drbd migration to primary
1920 # 2nd pass, do only the primary node
1921 for inst_disk in instance.disks:
1922 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1923 if node != instance.primary_node:
1925 cfg.SetDiskID(node_disk, node)
1926 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1928 logger.Error("could not prepare block device %s on node %s"
1929 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1931 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1933 # leave the disks configured for the primary node
1934 # this is a workaround that would be fixed better by
1935 # improving the logical/physical id handling
1936 for disk in instance.disks:
1937 cfg.SetDiskID(disk, instance.primary_node)
1939 return disks_ok, device_info
1942 def _StartInstanceDisks(cfg, instance, force):
1943 """Start the disks of an instance.
1946 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1947 ignore_secondaries=force)
1949 _ShutdownInstanceDisks(instance, cfg)
1950 if force is not None and not force:
1951 logger.Error("If the message above refers to a secondary node,"
1952 " you can retry the operation using '--force'.")
1953 raise errors.OpExecError("Disk consistency error")
1956 class LUDeactivateInstanceDisks(NoHooksLU):
1957 """Shutdown an instance's disks.
1960 _OP_REQP = ["instance_name"]
1962 def CheckPrereq(self):
1963 """Check prerequisites.
1965 This checks that the instance is in the cluster.
1968 instance = self.cfg.GetInstanceInfo(
1969 self.cfg.ExpandInstanceName(self.op.instance_name))
1970 if instance is None:
1971 raise errors.OpPrereqError("Instance '%s' not known" %
1972 self.op.instance_name)
1973 self.instance = instance
1975 def Exec(self, feedback_fn):
1976 """Deactivate the disks
1979 instance = self.instance
1980 ins_l = rpc.call_instance_list([instance.primary_node])
1981 ins_l = ins_l[instance.primary_node]
1982 if not type(ins_l) is list:
1983 raise errors.OpExecError("Can't contact node '%s'" %
1984 instance.primary_node)
1986 if self.instance.name in ins_l:
1987 raise errors.OpExecError("Instance is running, can't shutdown"
1990 _ShutdownInstanceDisks(instance, self.cfg)
1993 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1994 """Shutdown block devices of an instance.
1996 This does the shutdown on all nodes of the instance.
1998 If the ignore_primary is false, errors on the primary node are
2003 for disk in instance.disks:
2004 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2005 cfg.SetDiskID(top_disk, node)
2006 if not rpc.call_blockdev_shutdown(node, top_disk):
2007 logger.Error("could not shutdown block device %s on node %s" %
2008 (disk.iv_name, node))
2009 if not ignore_primary or node != instance.primary_node:
2014 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2015 """Checks if a node has enough free memory.
2017 This function check if a given node has the needed amount of free
2018 memory. In case the node has less memory or we cannot get the
2019 information from the node, this function raise an OpPrereqError
2023 - cfg: a ConfigWriter instance
2024 - node: the node name
2025 - reason: string to use in the error message
2026 - requested: the amount of memory in MiB
2029 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2030 if not nodeinfo or not isinstance(nodeinfo, dict):
2031 raise errors.OpPrereqError("Could not contact node %s for resource"
2032 " information" % (node,))
2034 free_mem = nodeinfo[node].get('memory_free')
2035 if not isinstance(free_mem, int):
2036 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2037 " was '%s'" % (node, free_mem))
2038 if requested > free_mem:
2039 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2040 " needed %s MiB, available %s MiB" %
2041 (node, reason, requested, free_mem))
2044 class LUStartupInstance(LogicalUnit):
2045 """Starts an instance.
2048 HPATH = "instance-start"
2049 HTYPE = constants.HTYPE_INSTANCE
2050 _OP_REQP = ["instance_name", "force"]
2052 def BuildHooksEnv(self):
2055 This runs on master, primary and secondary nodes of the instance.
2059 "FORCE": self.op.force,
2061 env.update(_BuildInstanceHookEnvByObject(self.instance))
2062 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2063 list(self.instance.secondary_nodes))
2066 def CheckPrereq(self):
2067 """Check prerequisites.
2069 This checks that the instance is in the cluster.
2072 instance = self.cfg.GetInstanceInfo(
2073 self.cfg.ExpandInstanceName(self.op.instance_name))
2074 if instance is None:
2075 raise errors.OpPrereqError("Instance '%s' not known" %
2076 self.op.instance_name)
2078 # check bridges existance
2079 _CheckInstanceBridgesExist(instance)
2081 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2082 "starting instance %s" % instance.name,
2085 self.instance = instance
2086 self.op.instance_name = instance.name
2088 def Exec(self, feedback_fn):
2089 """Start the instance.
2092 instance = self.instance
2093 force = self.op.force
2094 extra_args = getattr(self.op, "extra_args", "")
2096 self.cfg.MarkInstanceUp(instance.name)
2098 node_current = instance.primary_node
2100 _StartInstanceDisks(self.cfg, instance, force)
2102 if not rpc.call_instance_start(node_current, instance, extra_args):
2103 _ShutdownInstanceDisks(instance, self.cfg)
2104 raise errors.OpExecError("Could not start instance")
2107 class LURebootInstance(LogicalUnit):
2108 """Reboot an instance.
2111 HPATH = "instance-reboot"
2112 HTYPE = constants.HTYPE_INSTANCE
2113 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2115 def BuildHooksEnv(self):
2118 This runs on master, primary and secondary nodes of the instance.
2122 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2124 env.update(_BuildInstanceHookEnvByObject(self.instance))
2125 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2126 list(self.instance.secondary_nodes))
2129 def CheckPrereq(self):
2130 """Check prerequisites.
2132 This checks that the instance is in the cluster.
2135 instance = self.cfg.GetInstanceInfo(
2136 self.cfg.ExpandInstanceName(self.op.instance_name))
2137 if instance is None:
2138 raise errors.OpPrereqError("Instance '%s' not known" %
2139 self.op.instance_name)
2141 # check bridges existance
2142 _CheckInstanceBridgesExist(instance)
2144 self.instance = instance
2145 self.op.instance_name = instance.name
2147 def Exec(self, feedback_fn):
2148 """Reboot the instance.
2151 instance = self.instance
2152 ignore_secondaries = self.op.ignore_secondaries
2153 reboot_type = self.op.reboot_type
2154 extra_args = getattr(self.op, "extra_args", "")
2156 node_current = instance.primary_node
2158 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2159 constants.INSTANCE_REBOOT_HARD,
2160 constants.INSTANCE_REBOOT_FULL]:
2161 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2162 (constants.INSTANCE_REBOOT_SOFT,
2163 constants.INSTANCE_REBOOT_HARD,
2164 constants.INSTANCE_REBOOT_FULL))
2166 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2167 constants.INSTANCE_REBOOT_HARD]:
2168 if not rpc.call_instance_reboot(node_current, instance,
2169 reboot_type, extra_args):
2170 raise errors.OpExecError("Could not reboot instance")
2172 if not rpc.call_instance_shutdown(node_current, instance):
2173 raise errors.OpExecError("could not shutdown instance for full reboot")
2174 _ShutdownInstanceDisks(instance, self.cfg)
2175 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2176 if not rpc.call_instance_start(node_current, instance, extra_args):
2177 _ShutdownInstanceDisks(instance, self.cfg)
2178 raise errors.OpExecError("Could not start instance for full reboot")
2180 self.cfg.MarkInstanceUp(instance.name)
2183 class LUShutdownInstance(LogicalUnit):
2184 """Shutdown an instance.
2187 HPATH = "instance-stop"
2188 HTYPE = constants.HTYPE_INSTANCE
2189 _OP_REQP = ["instance_name"]
2191 def BuildHooksEnv(self):
2194 This runs on master, primary and secondary nodes of the instance.
2197 env = _BuildInstanceHookEnvByObject(self.instance)
2198 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2199 list(self.instance.secondary_nodes))
2202 def CheckPrereq(self):
2203 """Check prerequisites.
2205 This checks that the instance is in the cluster.
2208 instance = self.cfg.GetInstanceInfo(
2209 self.cfg.ExpandInstanceName(self.op.instance_name))
2210 if instance is None:
2211 raise errors.OpPrereqError("Instance '%s' not known" %
2212 self.op.instance_name)
2213 self.instance = instance
2215 def Exec(self, feedback_fn):
2216 """Shutdown the instance.
2219 instance = self.instance
2220 node_current = instance.primary_node
2221 self.cfg.MarkInstanceDown(instance.name)
2222 if not rpc.call_instance_shutdown(node_current, instance):
2223 logger.Error("could not shutdown instance")
2225 _ShutdownInstanceDisks(instance, self.cfg)
2228 class LUReinstallInstance(LogicalUnit):
2229 """Reinstall an instance.
2232 HPATH = "instance-reinstall"
2233 HTYPE = constants.HTYPE_INSTANCE
2234 _OP_REQP = ["instance_name"]
2236 def BuildHooksEnv(self):
2239 This runs on master, primary and secondary nodes of the instance.
2242 env = _BuildInstanceHookEnvByObject(self.instance)
2243 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2244 list(self.instance.secondary_nodes))
2247 def CheckPrereq(self):
2248 """Check prerequisites.
2250 This checks that the instance is in the cluster and is not running.
2253 instance = self.cfg.GetInstanceInfo(
2254 self.cfg.ExpandInstanceName(self.op.instance_name))
2255 if instance is None:
2256 raise errors.OpPrereqError("Instance '%s' not known" %
2257 self.op.instance_name)
2258 if instance.disk_template == constants.DT_DISKLESS:
2259 raise errors.OpPrereqError("Instance '%s' has no disks" %
2260 self.op.instance_name)
2261 if instance.status != "down":
2262 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2263 self.op.instance_name)
2264 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2266 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2267 (self.op.instance_name,
2268 instance.primary_node))
2270 self.op.os_type = getattr(self.op, "os_type", None)
2271 if self.op.os_type is not None:
2273 pnode = self.cfg.GetNodeInfo(
2274 self.cfg.ExpandNodeName(instance.primary_node))
2276 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2278 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2280 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2281 " primary node" % self.op.os_type)
2283 self.instance = instance
2285 def Exec(self, feedback_fn):
2286 """Reinstall the instance.
2289 inst = self.instance
2291 if self.op.os_type is not None:
2292 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2293 inst.os = self.op.os_type
2294 self.cfg.AddInstance(inst)
2296 _StartInstanceDisks(self.cfg, inst, None)
2298 feedback_fn("Running the instance OS create scripts...")
2299 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2300 raise errors.OpExecError("Could not install OS for instance %s"
2302 (inst.name, inst.primary_node))
2304 _ShutdownInstanceDisks(inst, self.cfg)
2307 class LURenameInstance(LogicalUnit):
2308 """Rename an instance.
2311 HPATH = "instance-rename"
2312 HTYPE = constants.HTYPE_INSTANCE
2313 _OP_REQP = ["instance_name", "new_name"]
2315 def BuildHooksEnv(self):
2318 This runs on master, primary and secondary nodes of the instance.
2321 env = _BuildInstanceHookEnvByObject(self.instance)
2322 env["INSTANCE_NEW_NAME"] = self.op.new_name
2323 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2324 list(self.instance.secondary_nodes))
2327 def CheckPrereq(self):
2328 """Check prerequisites.
2330 This checks that the instance is in the cluster and is not running.
2333 instance = self.cfg.GetInstanceInfo(
2334 self.cfg.ExpandInstanceName(self.op.instance_name))
2335 if instance is None:
2336 raise errors.OpPrereqError("Instance '%s' not known" %
2337 self.op.instance_name)
2338 if instance.status != "down":
2339 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2340 self.op.instance_name)
2341 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2343 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2344 (self.op.instance_name,
2345 instance.primary_node))
2346 self.instance = instance
2348 # new name verification
2349 name_info = utils.HostInfo(self.op.new_name)
2351 self.op.new_name = new_name = name_info.name
2352 instance_list = self.cfg.GetInstanceList()
2353 if new_name in instance_list:
2354 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2357 if not getattr(self.op, "ignore_ip", False):
2358 command = ["fping", "-q", name_info.ip]
2359 result = utils.RunCmd(command)
2360 if not result.failed:
2361 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2362 (name_info.ip, new_name))
2365 def Exec(self, feedback_fn):
2366 """Reinstall the instance.
2369 inst = self.instance
2370 old_name = inst.name
2372 if inst.disk_template == constants.DT_FILE:
2373 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2375 self.cfg.RenameInstance(inst.name, self.op.new_name)
2377 # re-read the instance from the configuration after rename
2378 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2380 if inst.disk_template == constants.DT_FILE:
2381 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2382 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2383 old_file_storage_dir,
2384 new_file_storage_dir)
2387 raise errors.OpExecError("Could not connect to node '%s' to rename"
2388 " directory '%s' to '%s' (but the instance"
2389 " has been renamed in Ganeti)" % (
2390 inst.primary_node, old_file_storage_dir,
2391 new_file_storage_dir))
2394 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2395 " (but the instance has been renamed in"
2396 " Ganeti)" % (old_file_storage_dir,
2397 new_file_storage_dir))
2399 _StartInstanceDisks(self.cfg, inst, None)
2401 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2403 msg = ("Could run OS rename script for instance %s on node %s (but the"
2404 " instance has been renamed in Ganeti)" %
2405 (inst.name, inst.primary_node))
2408 _ShutdownInstanceDisks(inst, self.cfg)
2411 class LURemoveInstance(LogicalUnit):
2412 """Remove an instance.
2415 HPATH = "instance-remove"
2416 HTYPE = constants.HTYPE_INSTANCE
2417 _OP_REQP = ["instance_name"]
2419 def BuildHooksEnv(self):
2422 This runs on master, primary and secondary nodes of the instance.
2425 env = _BuildInstanceHookEnvByObject(self.instance)
2426 nl = [self.sstore.GetMasterNode()]
2429 def CheckPrereq(self):
2430 """Check prerequisites.
2432 This checks that the instance is in the cluster.
2435 instance = self.cfg.GetInstanceInfo(
2436 self.cfg.ExpandInstanceName(self.op.instance_name))
2437 if instance is None:
2438 raise errors.OpPrereqError("Instance '%s' not known" %
2439 self.op.instance_name)
2440 self.instance = instance
2442 def Exec(self, feedback_fn):
2443 """Remove the instance.
2446 instance = self.instance
2447 logger.Info("shutting down instance %s on node %s" %
2448 (instance.name, instance.primary_node))
2450 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2451 if self.op.ignore_failures:
2452 feedback_fn("Warning: can't shutdown instance")
2454 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2455 (instance.name, instance.primary_node))
2457 logger.Info("removing block devices for instance %s" % instance.name)
2459 if not _RemoveDisks(instance, self.cfg):
2460 if self.op.ignore_failures:
2461 feedback_fn("Warning: can't remove instance's disks")
2463 raise errors.OpExecError("Can't remove instance's disks")
2465 logger.Info("removing instance %s out of cluster config" % instance.name)
2467 self.cfg.RemoveInstance(instance.name)
2470 class LUQueryInstances(NoHooksLU):
2471 """Logical unit for querying instances.
2474 _OP_REQP = ["output_fields", "names"]
2476 def CheckPrereq(self):
2477 """Check prerequisites.
2479 This checks that the fields required are valid output fields.
2482 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2483 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2484 "admin_state", "admin_ram",
2485 "disk_template", "ip", "mac", "bridge",
2486 "sda_size", "sdb_size", "vcpus"],
2487 dynamic=self.dynamic_fields,
2488 selected=self.op.output_fields)
2490 self.wanted = _GetWantedInstances(self, self.op.names)
2492 def Exec(self, feedback_fn):
2493 """Computes the list of nodes and their attributes.
2496 instance_names = self.wanted
2497 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2500 # begin data gathering
2502 nodes = frozenset([inst.primary_node for inst in instance_list])
2505 if self.dynamic_fields.intersection(self.op.output_fields):
2507 node_data = rpc.call_all_instances_info(nodes)
2509 result = node_data[name]
2511 live_data.update(result)
2512 elif result == False:
2513 bad_nodes.append(name)
2514 # else no instance is alive
2516 live_data = dict([(name, {}) for name in instance_names])
2518 # end data gathering
2521 for instance in instance_list:
2523 for field in self.op.output_fields:
2528 elif field == "pnode":
2529 val = instance.primary_node
2530 elif field == "snodes":
2531 val = list(instance.secondary_nodes)
2532 elif field == "admin_state":
2533 val = (instance.status != "down")
2534 elif field == "oper_state":
2535 if instance.primary_node in bad_nodes:
2538 val = bool(live_data.get(instance.name))
2539 elif field == "status":
2540 if instance.primary_node in bad_nodes:
2541 val = "ERROR_nodedown"
2543 running = bool(live_data.get(instance.name))
2545 if instance.status != "down":
2550 if instance.status != "down":
2554 elif field == "admin_ram":
2555 val = instance.memory
2556 elif field == "oper_ram":
2557 if instance.primary_node in bad_nodes:
2559 elif instance.name in live_data:
2560 val = live_data[instance.name].get("memory", "?")
2563 elif field == "disk_template":
2564 val = instance.disk_template
2566 val = instance.nics[0].ip
2567 elif field == "bridge":
2568 val = instance.nics[0].bridge
2569 elif field == "mac":
2570 val = instance.nics[0].mac
2571 elif field == "sda_size" or field == "sdb_size":
2572 disk = instance.FindDisk(field[:3])
2577 elif field == "vcpus":
2578 val = instance.vcpus
2580 raise errors.ParameterError(field)
2587 class LUFailoverInstance(LogicalUnit):
2588 """Failover an instance.
2591 HPATH = "instance-failover"
2592 HTYPE = constants.HTYPE_INSTANCE
2593 _OP_REQP = ["instance_name", "ignore_consistency"]
2595 def BuildHooksEnv(self):
2598 This runs on master, primary and secondary nodes of the instance.
2602 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2604 env.update(_BuildInstanceHookEnvByObject(self.instance))
2605 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2608 def CheckPrereq(self):
2609 """Check prerequisites.
2611 This checks that the instance is in the cluster.
2614 instance = self.cfg.GetInstanceInfo(
2615 self.cfg.ExpandInstanceName(self.op.instance_name))
2616 if instance is None:
2617 raise errors.OpPrereqError("Instance '%s' not known" %
2618 self.op.instance_name)
2620 if instance.disk_template not in constants.DTS_NET_MIRROR:
2621 raise errors.OpPrereqError("Instance's disk layout is not"
2622 " network mirrored, cannot failover.")
2624 secondary_nodes = instance.secondary_nodes
2625 if not secondary_nodes:
2626 raise errors.ProgrammerError("no secondary node but using "
2627 "DT_REMOTE_RAID1 template")
2629 target_node = secondary_nodes[0]
2630 # check memory requirements on the secondary node
2631 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2632 instance.name, instance.memory)
2634 # check bridge existance
2635 brlist = [nic.bridge for nic in instance.nics]
2636 if not rpc.call_bridges_exist(target_node, brlist):
2637 raise errors.OpPrereqError("One or more target bridges %s does not"
2638 " exist on destination node '%s'" %
2639 (brlist, target_node))
2641 self.instance = instance
2643 def Exec(self, feedback_fn):
2644 """Failover an instance.
2646 The failover is done by shutting it down on its present node and
2647 starting it on the secondary.
2650 instance = self.instance
2652 source_node = instance.primary_node
2653 target_node = instance.secondary_nodes[0]
2655 feedback_fn("* checking disk consistency between source and target")
2656 for dev in instance.disks:
2657 # for remote_raid1, these are md over drbd
2658 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2659 if instance.status == "up" and not self.op.ignore_consistency:
2660 raise errors.OpExecError("Disk %s is degraded on target node,"
2661 " aborting failover." % dev.iv_name)
2663 feedback_fn("* shutting down instance on source node")
2664 logger.Info("Shutting down instance %s on node %s" %
2665 (instance.name, source_node))
2667 if not rpc.call_instance_shutdown(source_node, instance):
2668 if self.op.ignore_consistency:
2669 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2670 " anyway. Please make sure node %s is down" %
2671 (instance.name, source_node, source_node))
2673 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2674 (instance.name, source_node))
2676 feedback_fn("* deactivating the instance's disks on source node")
2677 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2678 raise errors.OpExecError("Can't shut down the instance's disks.")
2680 instance.primary_node = target_node
2681 # distribute new instance config to the other nodes
2682 self.cfg.AddInstance(instance)
2684 # Only start the instance if it's marked as up
2685 if instance.status == "up":
2686 feedback_fn("* activating the instance's disks on target node")
2687 logger.Info("Starting instance %s on node %s" %
2688 (instance.name, target_node))
2690 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2691 ignore_secondaries=True)
2693 _ShutdownInstanceDisks(instance, self.cfg)
2694 raise errors.OpExecError("Can't activate the instance's disks")
2696 feedback_fn("* starting the instance on the target node")
2697 if not rpc.call_instance_start(target_node, instance, None):
2698 _ShutdownInstanceDisks(instance, self.cfg)
2699 raise errors.OpExecError("Could not start instance %s on node %s." %
2700 (instance.name, target_node))
2703 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2704 """Create a tree of block devices on the primary node.
2706 This always creates all devices.
2710 for child in device.children:
2711 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2714 cfg.SetDiskID(device, node)
2715 new_id = rpc.call_blockdev_create(node, device, device.size,
2716 instance.name, True, info)
2719 if device.physical_id is None:
2720 device.physical_id = new_id
2724 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2725 """Create a tree of block devices on a secondary node.
2727 If this device type has to be created on secondaries, create it and
2730 If not, just recurse to children keeping the same 'force' value.
2733 if device.CreateOnSecondary():
2736 for child in device.children:
2737 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2738 child, force, info):
2743 cfg.SetDiskID(device, node)
2744 new_id = rpc.call_blockdev_create(node, device, device.size,
2745 instance.name, False, info)
2748 if device.physical_id is None:
2749 device.physical_id = new_id
2753 def _GenerateUniqueNames(cfg, exts):
2754 """Generate a suitable LV name.
2756 This will generate a logical volume name for the given instance.
2761 new_id = cfg.GenerateUniqueID()
2762 results.append("%s%s" % (new_id, val))
2766 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2767 """Generate a drbd device complete with its children.
2770 port = cfg.AllocatePort()
2771 vgname = cfg.GetVGName()
2772 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2773 logical_id=(vgname, names[0]))
2774 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2775 logical_id=(vgname, names[1]))
2776 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2777 logical_id = (primary, secondary, port),
2778 children = [dev_data, dev_meta])
2782 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2783 """Generate a drbd8 device complete with its children.
2786 port = cfg.AllocatePort()
2787 vgname = cfg.GetVGName()
2788 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2789 logical_id=(vgname, names[0]))
2790 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2791 logical_id=(vgname, names[1]))
2792 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2793 logical_id = (primary, secondary, port),
2794 children = [dev_data, dev_meta],
2799 def _GenerateDiskTemplate(cfg, template_name,
2800 instance_name, primary_node,
2801 secondary_nodes, disk_sz, swap_sz,
2802 file_storage_dir, file_driver):
2803 """Generate the entire disk layout for a given template type.
2806 #TODO: compute space requirements
2808 vgname = cfg.GetVGName()
2809 if template_name == constants.DT_DISKLESS:
2811 elif template_name == constants.DT_PLAIN:
2812 if len(secondary_nodes) != 0:
2813 raise errors.ProgrammerError("Wrong template configuration")
2815 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2816 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2817 logical_id=(vgname, names[0]),
2819 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2820 logical_id=(vgname, names[1]),
2822 disks = [sda_dev, sdb_dev]
2823 elif template_name == constants.DT_DRBD8:
2824 if len(secondary_nodes) != 1:
2825 raise errors.ProgrammerError("Wrong template configuration")
2826 remote_node = secondary_nodes[0]
2827 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2828 ".sdb_data", ".sdb_meta"])
2829 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2830 disk_sz, names[0:2], "sda")
2831 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2832 swap_sz, names[2:4], "sdb")
2833 disks = [drbd_sda_dev, drbd_sdb_dev]
2834 elif template_name == constants.DT_FILE:
2835 if len(secondary_nodes) != 0:
2836 raise errors.ProgrammerError("Wrong template configuration")
2838 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2839 iv_name="sda", logical_id=(file_driver,
2840 "%s/sda" % file_storage_dir))
2841 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2842 iv_name="sdb", logical_id=(file_driver,
2843 "%s/sdb" % file_storage_dir))
2844 disks = [file_sda_dev, file_sdb_dev]
2846 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2850 def _GetInstanceInfoText(instance):
2851 """Compute that text that should be added to the disk's metadata.
2854 return "originstname+%s" % instance.name
2857 def _CreateDisks(cfg, instance):
2858 """Create all disks for an instance.
2860 This abstracts away some work from AddInstance.
2863 instance: the instance object
2866 True or False showing the success of the creation process
2869 info = _GetInstanceInfoText(instance)
2871 if instance.disk_template == constants.DT_FILE:
2872 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2873 result = rpc.call_file_storage_dir_create(instance.primary_node,
2877 logger.Error("Could not connect to node '%s'" % inst.primary_node)
2881 logger.Error("failed to create directory '%s'" % file_storage_dir)
2884 for device in instance.disks:
2885 logger.Info("creating volume %s for instance %s" %
2886 (device.iv_name, instance.name))
2888 for secondary_node in instance.secondary_nodes:
2889 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2890 device, False, info):
2891 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2892 (device.iv_name, device, secondary_node))
2895 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2896 instance, device, info):
2897 logger.Error("failed to create volume %s on primary!" %
2903 def _RemoveDisks(instance, cfg):
2904 """Remove all disks for an instance.
2906 This abstracts away some work from `AddInstance()` and
2907 `RemoveInstance()`. Note that in case some of the devices couldn't
2908 be removed, the removal will continue with the other ones (compare
2909 with `_CreateDisks()`).
2912 instance: the instance object
2915 True or False showing the success of the removal proces
2918 logger.Info("removing block devices for instance %s" % instance.name)
2921 for device in instance.disks:
2922 for node, disk in device.ComputeNodeTree(instance.primary_node):
2923 cfg.SetDiskID(disk, node)
2924 if not rpc.call_blockdev_remove(node, disk):
2925 logger.Error("could not remove block device %s on node %s,"
2926 " continuing anyway" %
2927 (device.iv_name, node))
2930 if instance.disk_template == constants.DT_FILE:
2931 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2932 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2934 logger.Error("could not remove directory '%s'" % file_storage_dir)
2940 class LUCreateInstance(LogicalUnit):
2941 """Create an instance.
2944 HPATH = "instance-add"
2945 HTYPE = constants.HTYPE_INSTANCE
2946 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2947 "disk_template", "swap_size", "mode", "start", "vcpus",
2948 "wait_for_sync", "ip_check", "mac"]
2950 def BuildHooksEnv(self):
2953 This runs on master, primary and secondary nodes of the instance.
2957 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2958 "INSTANCE_DISK_SIZE": self.op.disk_size,
2959 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2960 "INSTANCE_ADD_MODE": self.op.mode,
2962 if self.op.mode == constants.INSTANCE_IMPORT:
2963 env["INSTANCE_SRC_NODE"] = self.op.src_node
2964 env["INSTANCE_SRC_PATH"] = self.op.src_path
2965 env["INSTANCE_SRC_IMAGE"] = self.src_image
2967 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2968 primary_node=self.op.pnode,
2969 secondary_nodes=self.secondaries,
2970 status=self.instance_status,
2971 os_type=self.op.os_type,
2972 memory=self.op.mem_size,
2973 vcpus=self.op.vcpus,
2974 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2977 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2982 def CheckPrereq(self):
2983 """Check prerequisites.
2986 for attr in ["kernel_path", "initrd_path", "hvm_boot_order"]:
2987 if not hasattr(self.op, attr):
2988 setattr(self.op, attr, None)
2990 if self.op.mode not in (constants.INSTANCE_CREATE,
2991 constants.INSTANCE_IMPORT):
2992 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2995 if (not self.cfg.GetVGName() and
2996 self.op.disk_template not in constants.DTS_NOT_LVM):
2997 raise errors.OpPrereqError("Cluster does not support lvm-based"
3000 if self.op.mode == constants.INSTANCE_IMPORT:
3001 src_node = getattr(self.op, "src_node", None)
3002 src_path = getattr(self.op, "src_path", None)
3003 if src_node is None or src_path is None:
3004 raise errors.OpPrereqError("Importing an instance requires source"
3005 " node and path options")
3006 src_node_full = self.cfg.ExpandNodeName(src_node)
3007 if src_node_full is None:
3008 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3009 self.op.src_node = src_node = src_node_full
3011 if not os.path.isabs(src_path):
3012 raise errors.OpPrereqError("The source path must be absolute")
3014 export_info = rpc.call_export_info(src_node, src_path)
3017 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3019 if not export_info.has_section(constants.INISECT_EXP):
3020 raise errors.ProgrammerError("Corrupted export config")
3022 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3023 if (int(ei_version) != constants.EXPORT_VERSION):
3024 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3025 (ei_version, constants.EXPORT_VERSION))
3027 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3028 raise errors.OpPrereqError("Can't import instance with more than"
3031 # FIXME: are the old os-es, disk sizes, etc. useful?
3032 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3033 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3035 self.src_image = diskimage
3036 else: # INSTANCE_CREATE
3037 if getattr(self.op, "os_type", None) is None:
3038 raise errors.OpPrereqError("No guest OS specified")
3040 # check primary node
3041 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3043 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3045 self.op.pnode = pnode.name
3047 self.secondaries = []
3048 # disk template and mirror node verification
3049 if self.op.disk_template not in constants.DISK_TEMPLATES:
3050 raise errors.OpPrereqError("Invalid disk template name")
3052 if (self.op.file_driver and
3053 not self.op.file_driver in constants.FILE_DRIVER):
3054 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3055 self.op.file_driver)
3057 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3058 raise errors.OpPrereqError("File storage directory not a relative"
3061 if self.op.disk_template in constants.DTS_NET_MIRROR:
3062 if getattr(self.op, "snode", None) is None:
3063 raise errors.OpPrereqError("The networked disk templates need"
3066 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3067 if snode_name is None:
3068 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3070 elif snode_name == pnode.name:
3071 raise errors.OpPrereqError("The secondary node cannot be"
3072 " the primary node.")
3073 self.secondaries.append(snode_name)
3075 # Required free disk space as a function of disk and swap space
3077 constants.DT_DISKLESS: None,
3078 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
3079 # 256 MB are added for drbd metadata, 128MB for each drbd device
3080 constants.DT_DRBD8: self.op.disk_size + self.op.swap_size + 256,
3081 constants.DT_FILE: None,
3084 if self.op.disk_template not in req_size_dict:
3085 raise errors.ProgrammerError("Disk template '%s' size requirement"
3086 " is unknown" % self.op.disk_template)
3088 req_size = req_size_dict[self.op.disk_template]
3090 # Check lv size requirements
3091 if req_size is not None:
3092 nodenames = [pnode.name] + self.secondaries
3093 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3094 for node in nodenames:
3095 info = nodeinfo.get(node, None)
3097 raise errors.OpPrereqError("Cannot get current information"
3098 " from node '%s'" % nodeinfo)
3099 vg_free = info.get('vg_free', None)
3100 if not isinstance(vg_free, int):
3101 raise errors.OpPrereqError("Can't compute free disk space on"
3103 if req_size > info['vg_free']:
3104 raise errors.OpPrereqError("Not enough disk space on target node %s."
3105 " %d MB available, %d MB required" %
3106 (node, info['vg_free'], req_size))
3109 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3111 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3112 " primary node" % self.op.os_type)
3114 if self.op.kernel_path == constants.VALUE_NONE:
3115 raise errors.OpPrereqError("Can't set instance kernel to none")
3117 # instance verification
3118 hostname1 = utils.HostInfo(self.op.instance_name)
3120 self.op.instance_name = instance_name = hostname1.name
3121 instance_list = self.cfg.GetInstanceList()
3122 if instance_name in instance_list:
3123 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3126 ip = getattr(self.op, "ip", None)
3127 if ip is None or ip.lower() == "none":
3129 elif ip.lower() == "auto":
3130 inst_ip = hostname1.ip
3132 if not utils.IsValidIP(ip):
3133 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3134 " like a valid IP" % ip)
3136 self.inst_ip = inst_ip
3138 if self.op.start and not self.op.ip_check:
3139 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3140 " adding an instance in start mode")
3142 if self.op.ip_check:
3143 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3144 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3145 (hostname1.ip, instance_name))
3147 # MAC address verification
3148 if self.op.mac != "auto":
3149 if not utils.IsValidMac(self.op.mac.lower()):
3150 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3153 # bridge verification
3154 bridge = getattr(self.op, "bridge", None)
3156 self.op.bridge = self.cfg.GetDefBridge()
3158 self.op.bridge = bridge
3160 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3161 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3162 " destination node '%s'" %
3163 (self.op.bridge, pnode.name))
3165 # boot order verification
3166 if self.op.hvm_boot_order is not None:
3167 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3168 raise errors.OpPrereqError("invalid boot order specified,"
3169 " must be one or more of [acdn]")
3172 self.instance_status = 'up'
3174 self.instance_status = 'down'
3176 def Exec(self, feedback_fn):
3177 """Create and add the instance to the cluster.
3180 instance = self.op.instance_name
3181 pnode_name = self.pnode.name
3183 if self.op.mac == "auto":
3184 mac_address = self.cfg.GenerateMAC()
3186 mac_address = self.op.mac
3188 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3189 if self.inst_ip is not None:
3190 nic.ip = self.inst_ip
3192 ht_kind = self.sstore.GetHypervisorType()
3193 if ht_kind in constants.HTS_REQ_PORT:
3194 network_port = self.cfg.AllocatePort()
3198 # build the full file storage dir path
3199 file_storage_dir = os.path.normpath(os.path.join(
3200 self.sstore.GetFileStorageDir(),
3201 self.op.file_storage_dir, instance))
3204 disks = _GenerateDiskTemplate(self.cfg,
3205 self.op.disk_template,
3206 instance, pnode_name,
3207 self.secondaries, self.op.disk_size,
3210 self.op.file_driver)
3212 iobj = objects.Instance(name=instance, os=self.op.os_type,
3213 primary_node=pnode_name,
3214 memory=self.op.mem_size,
3215 vcpus=self.op.vcpus,
3216 nics=[nic], disks=disks,
3217 disk_template=self.op.disk_template,
3218 status=self.instance_status,
3219 network_port=network_port,
3220 kernel_path=self.op.kernel_path,
3221 initrd_path=self.op.initrd_path,
3222 hvm_boot_order=self.op.hvm_boot_order,
3225 feedback_fn("* creating instance disks...")
3226 if not _CreateDisks(self.cfg, iobj):
3227 _RemoveDisks(iobj, self.cfg)
3228 raise errors.OpExecError("Device creation failed, reverting...")
3230 feedback_fn("adding instance %s to cluster config" % instance)
3232 self.cfg.AddInstance(iobj)
3234 if self.op.wait_for_sync:
3235 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3236 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3237 # make sure the disks are not degraded (still sync-ing is ok)
3239 feedback_fn("* checking mirrors status")
3240 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3245 _RemoveDisks(iobj, self.cfg)
3246 self.cfg.RemoveInstance(iobj.name)
3247 raise errors.OpExecError("There are some degraded disks for"
3250 feedback_fn("creating os for instance %s on node %s" %
3251 (instance, pnode_name))
3253 if iobj.disk_template != constants.DT_DISKLESS:
3254 if self.op.mode == constants.INSTANCE_CREATE:
3255 feedback_fn("* running the instance OS create scripts...")
3256 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3257 raise errors.OpExecError("could not add os for instance %s"
3259 (instance, pnode_name))
3261 elif self.op.mode == constants.INSTANCE_IMPORT:
3262 feedback_fn("* running the instance OS import scripts...")
3263 src_node = self.op.src_node
3264 src_image = self.src_image
3265 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3266 src_node, src_image):
3267 raise errors.OpExecError("Could not import os for instance"
3269 (instance, pnode_name))
3271 # also checked in the prereq part
3272 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3276 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3277 feedback_fn("* starting instance...")
3278 if not rpc.call_instance_start(pnode_name, iobj, None):
3279 raise errors.OpExecError("Could not start instance")
3282 class LUConnectConsole(NoHooksLU):
3283 """Connect to an instance's console.
3285 This is somewhat special in that it returns the command line that
3286 you need to run on the master node in order to connect to the
3290 _OP_REQP = ["instance_name"]
3292 def CheckPrereq(self):
3293 """Check prerequisites.
3295 This checks that the instance is in the cluster.
3298 instance = self.cfg.GetInstanceInfo(
3299 self.cfg.ExpandInstanceName(self.op.instance_name))
3300 if instance is None:
3301 raise errors.OpPrereqError("Instance '%s' not known" %
3302 self.op.instance_name)
3303 self.instance = instance
3305 def Exec(self, feedback_fn):
3306 """Connect to the console of an instance
3309 instance = self.instance
3310 node = instance.primary_node
3312 node_insts = rpc.call_instance_list([node])[node]
3313 if node_insts is False:
3314 raise errors.OpExecError("Can't connect to node %s." % node)
3316 if instance.name not in node_insts:
3317 raise errors.OpExecError("Instance %s is not running." % instance.name)
3319 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3321 hyper = hypervisor.GetHypervisor()
3322 console_cmd = hyper.GetShellCommandForConsole(instance)
3325 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3328 class LUReplaceDisks(LogicalUnit):
3329 """Replace the disks of an instance.
3332 HPATH = "mirrors-replace"
3333 HTYPE = constants.HTYPE_INSTANCE
3334 _OP_REQP = ["instance_name", "mode", "disks"]
3336 def BuildHooksEnv(self):
3339 This runs on the master, the primary and all the secondaries.
3343 "MODE": self.op.mode,
3344 "NEW_SECONDARY": self.op.remote_node,
3345 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3347 env.update(_BuildInstanceHookEnvByObject(self.instance))
3349 self.sstore.GetMasterNode(),
3350 self.instance.primary_node,
3352 if self.op.remote_node is not None:
3353 nl.append(self.op.remote_node)
3356 def CheckPrereq(self):
3357 """Check prerequisites.
3359 This checks that the instance is in the cluster.
3362 instance = self.cfg.GetInstanceInfo(
3363 self.cfg.ExpandInstanceName(self.op.instance_name))
3364 if instance is None:
3365 raise errors.OpPrereqError("Instance '%s' not known" %
3366 self.op.instance_name)
3367 self.instance = instance
3368 self.op.instance_name = instance.name
3370 if instance.disk_template not in constants.DTS_NET_MIRROR:
3371 raise errors.OpPrereqError("Instance's disk layout is not"
3372 " network mirrored.")
3374 if len(instance.secondary_nodes) != 1:
3375 raise errors.OpPrereqError("The instance has a strange layout,"
3376 " expected one secondary but found %d" %
3377 len(instance.secondary_nodes))
3379 self.sec_node = instance.secondary_nodes[0]
3381 remote_node = getattr(self.op, "remote_node", None)
3382 if remote_node is not None:
3383 remote_node = self.cfg.ExpandNodeName(remote_node)
3384 if remote_node is None:
3385 raise errors.OpPrereqError("Node '%s' not known" %
3386 self.op.remote_node)
3387 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3389 self.remote_node_info = None
3390 if remote_node == instance.primary_node:
3391 raise errors.OpPrereqError("The specified node is the primary node of"
3393 elif remote_node == self.sec_node:
3394 if self.op.mode == constants.REPLACE_DISK_SEC:
3395 # this is for DRBD8, where we can't execute the same mode of
3396 # replacement as for drbd7 (no different port allocated)
3397 raise errors.OpPrereqError("Same secondary given, cannot execute"
3399 # the user gave the current secondary, switch to
3400 # 'no-replace-secondary' mode for drbd7
3402 if (instance.disk_template == constants.DT_REMOTE_RAID1 and
3403 self.op.mode != constants.REPLACE_DISK_ALL):
3404 raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
3405 " disks replacement, not individual ones")
3406 if instance.disk_template == constants.DT_DRBD8:
3407 if (self.op.mode == constants.REPLACE_DISK_ALL and
3408 remote_node is not None):
3409 # switch to replace secondary mode
3410 self.op.mode = constants.REPLACE_DISK_SEC
3412 if self.op.mode == constants.REPLACE_DISK_ALL:
3413 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3414 " secondary disk replacement, not"
3416 elif self.op.mode == constants.REPLACE_DISK_PRI:
3417 if remote_node is not None:
3418 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3419 " the secondary while doing a primary"
3420 " node disk replacement")
3421 self.tgt_node = instance.primary_node
3422 self.oth_node = instance.secondary_nodes[0]
3423 elif self.op.mode == constants.REPLACE_DISK_SEC:
3424 self.new_node = remote_node # this can be None, in which case
3425 # we don't change the secondary
3426 self.tgt_node = instance.secondary_nodes[0]
3427 self.oth_node = instance.primary_node
3429 raise errors.ProgrammerError("Unhandled disk replace mode")
3431 for name in self.op.disks:
3432 if instance.FindDisk(name) is None:
3433 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3434 (name, instance.name))
3435 self.op.remote_node = remote_node
3437 def _ExecRR1(self, feedback_fn):
3438 """Replace the disks of an instance.
3441 instance = self.instance
3444 if self.op.remote_node is None:
3445 remote_node = self.sec_node
3447 remote_node = self.op.remote_node
3449 for dev in instance.disks:
3451 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3452 names = _GenerateUniqueNames(cfg, lv_names)
3453 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3454 remote_node, size, names)
3455 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3456 logger.Info("adding new mirror component on secondary for %s" %
3459 if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
3461 _GetInstanceInfoText(instance)):
3462 raise errors.OpExecError("Failed to create new component on secondary"
3463 " node %s. Full abort, cleanup manually!" %
3466 logger.Info("adding new mirror component on primary")
3468 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
3470 _GetInstanceInfoText(instance)):
3471 # remove secondary dev
3472 cfg.SetDiskID(new_drbd, remote_node)
3473 rpc.call_blockdev_remove(remote_node, new_drbd)
3474 raise errors.OpExecError("Failed to create volume on primary!"
3475 " Full abort, cleanup manually!!")
3477 # the device exists now
3478 # call the primary node to add the mirror to md
3479 logger.Info("adding new mirror component to md")
3480 if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
3482 logger.Error("Can't add mirror compoment to md!")
3483 cfg.SetDiskID(new_drbd, remote_node)
3484 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3485 logger.Error("Can't rollback on secondary")
3486 cfg.SetDiskID(new_drbd, instance.primary_node)
3487 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3488 logger.Error("Can't rollback on primary")
3489 raise errors.OpExecError("Full abort, cleanup manually!!")
3491 dev.children.append(new_drbd)
3492 cfg.AddInstance(instance)
3494 # this can fail as the old devices are degraded and _WaitForSync
3495 # does a combined result over all disks, so we don't check its
3497 _WaitForSync(cfg, instance, self.proc, unlock=True)
3499 # so check manually all the devices
3500 for name in iv_names:
3501 dev, child, new_drbd = iv_names[name]
3502 cfg.SetDiskID(dev, instance.primary_node)
3503 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3505 raise errors.OpExecError("MD device %s is degraded!" % name)
3506 cfg.SetDiskID(new_drbd, instance.primary_node)
3507 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3509 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3511 for name in iv_names:
3512 dev, child, new_drbd = iv_names[name]
3513 logger.Info("remove mirror %s component" % name)
3514 cfg.SetDiskID(dev, instance.primary_node)
3515 if not rpc.call_blockdev_removechildren(instance.primary_node,
3517 logger.Error("Can't remove child from mirror, aborting"
3518 " *this device cleanup*.\nYou need to cleanup manually!!")
3521 for node in child.logical_id[:2]:
3522 logger.Info("remove child device on %s" % node)
3523 cfg.SetDiskID(child, node)
3524 if not rpc.call_blockdev_remove(node, child):
3525 logger.Error("Warning: failed to remove device from node %s,"
3526 " continuing operation." % node)
3528 dev.children.remove(child)
3530 cfg.AddInstance(instance)
3532 def _ExecD8DiskOnly(self, feedback_fn):
3533 """Replace a disk on the primary or secondary for dbrd8.
3535 The algorithm for replace is quite complicated:
3536 - for each disk to be replaced:
3537 - create new LVs on the target node with unique names
3538 - detach old LVs from the drbd device
3539 - rename old LVs to name_replaced.<time_t>
3540 - rename new LVs to old LVs
3541 - attach the new LVs (with the old names now) to the drbd device
3542 - wait for sync across all devices
3543 - for each modified disk:
3544 - remove old LVs (which have the name name_replaces.<time_t>)
3546 Failures are not very well handled.
3550 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3551 instance = self.instance
3553 vgname = self.cfg.GetVGName()
3556 tgt_node = self.tgt_node
3557 oth_node = self.oth_node
3559 # Step: check device activation
3560 self.proc.LogStep(1, steps_total, "check device existence")
3561 info("checking volume groups")
3562 my_vg = cfg.GetVGName()
3563 results = rpc.call_vg_list([oth_node, tgt_node])
3565 raise errors.OpExecError("Can't list volume groups on the nodes")
3566 for node in oth_node, tgt_node:
3567 res = results.get(node, False)
3568 if not res or my_vg not in res:
3569 raise errors.OpExecError("Volume group '%s' not found on %s" %
3571 for dev in instance.disks:
3572 if not dev.iv_name in self.op.disks:
3574 for node in tgt_node, oth_node:
3575 info("checking %s on %s" % (dev.iv_name, node))
3576 cfg.SetDiskID(dev, node)
3577 if not rpc.call_blockdev_find(node, dev):
3578 raise errors.OpExecError("Can't find device %s on node %s" %
3579 (dev.iv_name, node))
3581 # Step: check other node consistency
3582 self.proc.LogStep(2, steps_total, "check peer consistency")
3583 for dev in instance.disks:
3584 if not dev.iv_name in self.op.disks:
3586 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3587 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3588 oth_node==instance.primary_node):
3589 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3590 " to replace disks on this node (%s)" %
3591 (oth_node, tgt_node))
3593 # Step: create new storage
3594 self.proc.LogStep(3, steps_total, "allocate new storage")
3595 for dev in instance.disks:
3596 if not dev.iv_name in self.op.disks:
3599 cfg.SetDiskID(dev, tgt_node)
3600 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3601 names = _GenerateUniqueNames(cfg, lv_names)
3602 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3603 logical_id=(vgname, names[0]))
3604 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3605 logical_id=(vgname, names[1]))
3606 new_lvs = [lv_data, lv_meta]
3607 old_lvs = dev.children
3608 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3609 info("creating new local storage on %s for %s" %
3610 (tgt_node, dev.iv_name))
3611 # since we *always* want to create this LV, we use the
3612 # _Create...OnPrimary (which forces the creation), even if we
3613 # are talking about the secondary node
3614 for new_lv in new_lvs:
3615 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3616 _GetInstanceInfoText(instance)):
3617 raise errors.OpExecError("Failed to create new LV named '%s' on"
3619 (new_lv.logical_id[1], tgt_node))
3621 # Step: for each lv, detach+rename*2+attach
3622 self.proc.LogStep(4, steps_total, "change drbd configuration")
3623 for dev, old_lvs, new_lvs in iv_names.itervalues():
3624 info("detaching %s drbd from local storage" % dev.iv_name)
3625 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3626 raise errors.OpExecError("Can't detach drbd from local storage on node"
3627 " %s for device %s" % (tgt_node, dev.iv_name))
3629 #cfg.Update(instance)
3631 # ok, we created the new LVs, so now we know we have the needed
3632 # storage; as such, we proceed on the target node to rename
3633 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3634 # using the assumption that logical_id == physical_id (which in
3635 # turn is the unique_id on that node)
3637 # FIXME(iustin): use a better name for the replaced LVs
3638 temp_suffix = int(time.time())
3639 ren_fn = lambda d, suff: (d.physical_id[0],
3640 d.physical_id[1] + "_replaced-%s" % suff)
3641 # build the rename list based on what LVs exist on the node
3643 for to_ren in old_lvs:
3644 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3645 if find_res is not None: # device exists
3646 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3648 info("renaming the old LVs on the target node")
3649 if not rpc.call_blockdev_rename(tgt_node, rlist):
3650 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3651 # now we rename the new LVs to the old LVs
3652 info("renaming the new LVs on the target node")
3653 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3654 if not rpc.call_blockdev_rename(tgt_node, rlist):
3655 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3657 for old, new in zip(old_lvs, new_lvs):
3658 new.logical_id = old.logical_id
3659 cfg.SetDiskID(new, tgt_node)
3661 for disk in old_lvs:
3662 disk.logical_id = ren_fn(disk, temp_suffix)
3663 cfg.SetDiskID(disk, tgt_node)
3665 # now that the new lvs have the old name, we can add them to the device
3666 info("adding new mirror component on %s" % tgt_node)
3667 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3668 for new_lv in new_lvs:
3669 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3670 warning("Can't rollback device %s", hint="manually cleanup unused"
3672 raise errors.OpExecError("Can't add local storage to drbd")
3674 dev.children = new_lvs
3675 cfg.Update(instance)
3677 # Step: wait for sync
3679 # this can fail as the old devices are degraded and _WaitForSync
3680 # does a combined result over all disks, so we don't check its
3682 self.proc.LogStep(5, steps_total, "sync devices")
3683 _WaitForSync(cfg, instance, self.proc, unlock=True)
3685 # so check manually all the devices
3686 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3687 cfg.SetDiskID(dev, instance.primary_node)
3688 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3690 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3692 # Step: remove old storage
3693 self.proc.LogStep(6, steps_total, "removing old storage")
3694 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3695 info("remove logical volumes for %s" % name)
3697 cfg.SetDiskID(lv, tgt_node)
3698 if not rpc.call_blockdev_remove(tgt_node, lv):
3699 warning("Can't remove old LV", hint="manually remove unused LVs")
3702 def _ExecD8Secondary(self, feedback_fn):
3703 """Replace the secondary node for drbd8.
3705 The algorithm for replace is quite complicated:
3706 - for all disks of the instance:
3707 - create new LVs on the new node with same names
3708 - shutdown the drbd device on the old secondary
3709 - disconnect the drbd network on the primary
3710 - create the drbd device on the new secondary
3711 - network attach the drbd on the primary, using an artifice:
3712 the drbd code for Attach() will connect to the network if it
3713 finds a device which is connected to the good local disks but
3715 - wait for sync across all devices
3716 - remove all disks from the old secondary
3718 Failures are not very well handled.
3722 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3723 instance = self.instance
3725 vgname = self.cfg.GetVGName()
3728 old_node = self.tgt_node
3729 new_node = self.new_node
3730 pri_node = instance.primary_node
3732 # Step: check device activation
3733 self.proc.LogStep(1, steps_total, "check device existence")
3734 info("checking volume groups")
3735 my_vg = cfg.GetVGName()
3736 results = rpc.call_vg_list([pri_node, new_node])
3738 raise errors.OpExecError("Can't list volume groups on the nodes")
3739 for node in pri_node, new_node:
3740 res = results.get(node, False)
3741 if not res or my_vg not in res:
3742 raise errors.OpExecError("Volume group '%s' not found on %s" %
3744 for dev in instance.disks:
3745 if not dev.iv_name in self.op.disks:
3747 info("checking %s on %s" % (dev.iv_name, pri_node))
3748 cfg.SetDiskID(dev, pri_node)
3749 if not rpc.call_blockdev_find(pri_node, dev):
3750 raise errors.OpExecError("Can't find device %s on node %s" %
3751 (dev.iv_name, pri_node))
3753 # Step: check other node consistency
3754 self.proc.LogStep(2, steps_total, "check peer consistency")
3755 for dev in instance.disks:
3756 if not dev.iv_name in self.op.disks:
3758 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3759 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3760 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3761 " unsafe to replace the secondary" %
3764 # Step: create new storage
3765 self.proc.LogStep(3, steps_total, "allocate new storage")
3766 for dev in instance.disks:
3768 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3769 # since we *always* want to create this LV, we use the
3770 # _Create...OnPrimary (which forces the creation), even if we
3771 # are talking about the secondary node
3772 for new_lv in dev.children:
3773 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3774 _GetInstanceInfoText(instance)):
3775 raise errors.OpExecError("Failed to create new LV named '%s' on"
3777 (new_lv.logical_id[1], new_node))
3779 iv_names[dev.iv_name] = (dev, dev.children)
3781 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3782 for dev in instance.disks:
3784 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3785 # create new devices on new_node
3786 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3787 logical_id=(pri_node, new_node,
3789 children=dev.children)
3790 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3792 _GetInstanceInfoText(instance)):
3793 raise errors.OpExecError("Failed to create new DRBD on"
3794 " node '%s'" % new_node)
3796 for dev in instance.disks:
3797 # we have new devices, shutdown the drbd on the old secondary
3798 info("shutting down drbd for %s on old node" % dev.iv_name)
3799 cfg.SetDiskID(dev, old_node)
3800 if not rpc.call_blockdev_shutdown(old_node, dev):
3801 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3802 hint="Please cleanup this device manually as soon as possible")
3804 info("detaching primary drbds from the network (=> standalone)")
3806 for dev in instance.disks:
3807 cfg.SetDiskID(dev, pri_node)
3808 # set the physical (unique in bdev terms) id to None, meaning
3809 # detach from network
3810 dev.physical_id = (None,) * len(dev.physical_id)
3811 # and 'find' the device, which will 'fix' it to match the
3813 if rpc.call_blockdev_find(pri_node, dev):
3816 warning("Failed to detach drbd %s from network, unusual case" %
3820 # no detaches succeeded (very unlikely)
3821 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3823 # if we managed to detach at least one, we update all the disks of
3824 # the instance to point to the new secondary
3825 info("updating instance configuration")
3826 for dev in instance.disks:
3827 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3828 cfg.SetDiskID(dev, pri_node)
3829 cfg.Update(instance)
3831 # and now perform the drbd attach
3832 info("attaching primary drbds to new secondary (standalone => connected)")
3834 for dev in instance.disks:
3835 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3836 # since the attach is smart, it's enough to 'find' the device,
3837 # it will automatically activate the network, if the physical_id
3839 cfg.SetDiskID(dev, pri_node)
3840 if not rpc.call_blockdev_find(pri_node, dev):
3841 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3842 "please do a gnt-instance info to see the status of disks")
3844 # this can fail as the old devices are degraded and _WaitForSync
3845 # does a combined result over all disks, so we don't check its
3847 self.proc.LogStep(5, steps_total, "sync devices")
3848 _WaitForSync(cfg, instance, self.proc, unlock=True)
3850 # so check manually all the devices
3851 for name, (dev, old_lvs) in iv_names.iteritems():
3852 cfg.SetDiskID(dev, pri_node)
3853 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3855 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3857 self.proc.LogStep(6, steps_total, "removing old storage")
3858 for name, (dev, old_lvs) in iv_names.iteritems():
3859 info("remove logical volumes for %s" % name)
3861 cfg.SetDiskID(lv, old_node)
3862 if not rpc.call_blockdev_remove(old_node, lv):
3863 warning("Can't remove LV on old secondary",
3864 hint="Cleanup stale volumes by hand")
3866 def Exec(self, feedback_fn):
3867 """Execute disk replacement.
3869 This dispatches the disk replacement to the appropriate handler.
3872 instance = self.instance
3873 if instance.disk_template == constants.DT_REMOTE_RAID1:
3875 elif instance.disk_template == constants.DT_DRBD8:
3876 if self.op.remote_node is None:
3877 fn = self._ExecD8DiskOnly
3879 fn = self._ExecD8Secondary
3881 raise errors.ProgrammerError("Unhandled disk replacement case")
3882 return fn(feedback_fn)
3885 class LUQueryInstanceData(NoHooksLU):
3886 """Query runtime instance data.
3889 _OP_REQP = ["instances"]
3891 def CheckPrereq(self):
3892 """Check prerequisites.
3894 This only checks the optional instance list against the existing names.
3897 if not isinstance(self.op.instances, list):
3898 raise errors.OpPrereqError("Invalid argument type 'instances'")
3899 if self.op.instances:
3900 self.wanted_instances = []
3901 names = self.op.instances
3903 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3904 if instance is None:
3905 raise errors.OpPrereqError("No such instance name '%s'" % name)
3906 self.wanted_instances.append(instance)
3908 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3909 in self.cfg.GetInstanceList()]
3913 def _ComputeDiskStatus(self, instance, snode, dev):
3914 """Compute block device status.
3917 self.cfg.SetDiskID(dev, instance.primary_node)
3918 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3919 if dev.dev_type in constants.LDS_DRBD:
3920 # we change the snode then (otherwise we use the one passed in)
3921 if dev.logical_id[0] == instance.primary_node:
3922 snode = dev.logical_id[1]
3924 snode = dev.logical_id[0]
3927 self.cfg.SetDiskID(dev, snode)
3928 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3933 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3934 for child in dev.children]
3939 "iv_name": dev.iv_name,
3940 "dev_type": dev.dev_type,
3941 "logical_id": dev.logical_id,
3942 "physical_id": dev.physical_id,
3943 "pstatus": dev_pstatus,
3944 "sstatus": dev_sstatus,
3945 "children": dev_children,
3950 def Exec(self, feedback_fn):
3951 """Gather and return data"""
3953 for instance in self.wanted_instances:
3954 remote_info = rpc.call_instance_info(instance.primary_node,
3956 if remote_info and "state" in remote_info:
3959 remote_state = "down"
3960 if instance.status == "down":
3961 config_state = "down"
3965 disks = [self._ComputeDiskStatus(instance, None, device)
3966 for device in instance.disks]
3969 "name": instance.name,
3970 "config_state": config_state,
3971 "run_state": remote_state,
3972 "pnode": instance.primary_node,
3973 "snodes": instance.secondary_nodes,
3975 "memory": instance.memory,
3976 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3978 "network_port": instance.network_port,
3979 "vcpus": instance.vcpus,
3980 "kernel_path": instance.kernel_path,
3981 "initrd_path": instance.initrd_path,
3982 "hvm_boot_order": instance.hvm_boot_order,
3985 result[instance.name] = idict
3990 class LUSetInstanceParams(LogicalUnit):
3991 """Modifies an instances's parameters.
3994 HPATH = "instance-modify"
3995 HTYPE = constants.HTYPE_INSTANCE
3996 _OP_REQP = ["instance_name"]
3998 def BuildHooksEnv(self):
4001 This runs on the master, primary and secondaries.
4006 args['memory'] = self.mem
4008 args['vcpus'] = self.vcpus
4009 if self.do_ip or self.do_bridge or self.mac:
4013 ip = self.instance.nics[0].ip
4015 bridge = self.bridge
4017 bridge = self.instance.nics[0].bridge
4021 mac = self.instance.nics[0].mac
4022 args['nics'] = [(ip, bridge, mac)]
4023 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4024 nl = [self.sstore.GetMasterNode(),
4025 self.instance.primary_node] + list(self.instance.secondary_nodes)
4028 def CheckPrereq(self):
4029 """Check prerequisites.
4031 This only checks the instance list against the existing names.
4034 self.mem = getattr(self.op, "mem", None)
4035 self.vcpus = getattr(self.op, "vcpus", None)
4036 self.ip = getattr(self.op, "ip", None)
4037 self.mac = getattr(self.op, "mac", None)
4038 self.bridge = getattr(self.op, "bridge", None)
4039 self.kernel_path = getattr(self.op, "kernel_path", None)
4040 self.initrd_path = getattr(self.op, "initrd_path", None)
4041 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4042 all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4043 self.kernel_path, self.initrd_path, self.hvm_boot_order]
4044 if all_params.count(None) == len(all_params):
4045 raise errors.OpPrereqError("No changes submitted")
4046 if self.mem is not None:
4048 self.mem = int(self.mem)
4049 except ValueError, err:
4050 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4051 if self.vcpus is not None:
4053 self.vcpus = int(self.vcpus)
4054 except ValueError, err:
4055 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4056 if self.ip is not None:
4058 if self.ip.lower() == "none":
4061 if not utils.IsValidIP(self.ip):
4062 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4065 self.do_bridge = (self.bridge is not None)
4066 if self.mac is not None:
4067 if self.cfg.IsMacInUse(self.mac):
4068 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4070 if not utils.IsValidMac(self.mac):
4071 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4073 if self.kernel_path is not None:
4074 self.do_kernel_path = True
4075 if self.kernel_path == constants.VALUE_NONE:
4076 raise errors.OpPrereqError("Can't set instance to no kernel")
4078 if self.kernel_path != constants.VALUE_DEFAULT:
4079 if not os.path.isabs(self.kernel_path):
4080 raise errors.OpPrereqError("The kernel path must be an absolute"
4083 self.do_kernel_path = False
4085 if self.initrd_path is not None:
4086 self.do_initrd_path = True
4087 if self.initrd_path not in (constants.VALUE_NONE,
4088 constants.VALUE_DEFAULT):
4089 if not os.path.isabs(self.initrd_path):
4090 raise errors.OpPrereqError("The initrd path must be an absolute"
4093 self.do_initrd_path = False
4095 # boot order verification
4096 if self.hvm_boot_order is not None:
4097 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4098 if len(self.hvm_boot_order.strip("acdn")) != 0:
4099 raise errors.OpPrereqError("invalid boot order specified,"
4100 " must be one or more of [acdn]"
4103 instance = self.cfg.GetInstanceInfo(
4104 self.cfg.ExpandInstanceName(self.op.instance_name))
4105 if instance is None:
4106 raise errors.OpPrereqError("No such instance name '%s'" %
4107 self.op.instance_name)
4108 self.op.instance_name = instance.name
4109 self.instance = instance
4112 def Exec(self, feedback_fn):
4113 """Modifies an instance.
4115 All parameters take effect only at the next restart of the instance.
4118 instance = self.instance
4120 instance.memory = self.mem
4121 result.append(("mem", self.mem))
4123 instance.vcpus = self.vcpus
4124 result.append(("vcpus", self.vcpus))
4126 instance.nics[0].ip = self.ip
4127 result.append(("ip", self.ip))
4129 instance.nics[0].bridge = self.bridge
4130 result.append(("bridge", self.bridge))
4132 instance.nics[0].mac = self.mac
4133 result.append(("mac", self.mac))
4134 if self.do_kernel_path:
4135 instance.kernel_path = self.kernel_path
4136 result.append(("kernel_path", self.kernel_path))
4137 if self.do_initrd_path:
4138 instance.initrd_path = self.initrd_path
4139 result.append(("initrd_path", self.initrd_path))
4140 if self.hvm_boot_order:
4141 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4142 instance.hvm_boot_order = None
4144 instance.hvm_boot_order = self.hvm_boot_order
4145 result.append(("hvm_boot_order", self.hvm_boot_order))
4147 self.cfg.AddInstance(instance)
4152 class LUQueryExports(NoHooksLU):
4153 """Query the exports list
4158 def CheckPrereq(self):
4159 """Check that the nodelist contains only existing nodes.
4162 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4164 def Exec(self, feedback_fn):
4165 """Compute the list of all the exported system images.
4168 a dictionary with the structure node->(export-list)
4169 where export-list is a list of the instances exported on
4173 return rpc.call_export_list(self.nodes)
4176 class LUExportInstance(LogicalUnit):
4177 """Export an instance to an image in the cluster.
4180 HPATH = "instance-export"
4181 HTYPE = constants.HTYPE_INSTANCE
4182 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4184 def BuildHooksEnv(self):
4187 This will run on the master, primary node and target node.
4191 "EXPORT_NODE": self.op.target_node,
4192 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4194 env.update(_BuildInstanceHookEnvByObject(self.instance))
4195 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4196 self.op.target_node]
4199 def CheckPrereq(self):
4200 """Check prerequisites.
4202 This checks that the instance name is a valid one.
4205 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4206 self.instance = self.cfg.GetInstanceInfo(instance_name)
4207 if self.instance is None:
4208 raise errors.OpPrereqError("Instance '%s' not found" %
4209 self.op.instance_name)
4212 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4213 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4215 if self.dst_node is None:
4216 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4217 self.op.target_node)
4218 self.op.target_node = self.dst_node.name
4220 def Exec(self, feedback_fn):
4221 """Export an instance to an image in the cluster.
4224 instance = self.instance
4225 dst_node = self.dst_node
4226 src_node = instance.primary_node
4227 if self.op.shutdown:
4228 # shutdown the instance, but not the disks
4229 if not rpc.call_instance_shutdown(src_node, instance):
4230 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4231 (instance.name, source_node))
4233 vgname = self.cfg.GetVGName()
4238 for disk in instance.disks:
4239 if disk.iv_name == "sda":
4240 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4241 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4243 if not new_dev_name:
4244 logger.Error("could not snapshot block device %s on node %s" %
4245 (disk.logical_id[1], src_node))
4247 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4248 logical_id=(vgname, new_dev_name),
4249 physical_id=(vgname, new_dev_name),
4250 iv_name=disk.iv_name)
4251 snap_disks.append(new_dev)
4254 if self.op.shutdown and instance.status == "up":
4255 if not rpc.call_instance_start(src_node, instance, None):
4256 _ShutdownInstanceDisks(instance, self.cfg)
4257 raise errors.OpExecError("Could not start instance")
4259 # TODO: check for size
4261 for dev in snap_disks:
4262 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
4264 logger.Error("could not export block device %s from node"
4266 (dev.logical_id[1], src_node, dst_node.name))
4267 if not rpc.call_blockdev_remove(src_node, dev):
4268 logger.Error("could not remove snapshot block device %s from"
4269 " node %s" % (dev.logical_id[1], src_node))
4271 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4272 logger.Error("could not finalize export for instance %s on node %s" %
4273 (instance.name, dst_node.name))
4275 nodelist = self.cfg.GetNodeList()
4276 nodelist.remove(dst_node.name)
4278 # on one-node clusters nodelist will be empty after the removal
4279 # if we proceed the backup would be removed because OpQueryExports
4280 # substitutes an empty list with the full cluster node list.
4282 op = opcodes.OpQueryExports(nodes=nodelist)
4283 exportlist = self.proc.ChainOpCode(op)
4284 for node in exportlist:
4285 if instance.name in exportlist[node]:
4286 if not rpc.call_export_remove(node, instance.name):
4287 logger.Error("could not remove older export for instance %s"
4288 " on node %s" % (instance.name, node))
4291 class TagsLU(NoHooksLU):
4294 This is an abstract class which is the parent of all the other tags LUs.
4297 def CheckPrereq(self):
4298 """Check prerequisites.
4301 if self.op.kind == constants.TAG_CLUSTER:
4302 self.target = self.cfg.GetClusterInfo()
4303 elif self.op.kind == constants.TAG_NODE:
4304 name = self.cfg.ExpandNodeName(self.op.name)
4306 raise errors.OpPrereqError("Invalid node name (%s)" %
4309 self.target = self.cfg.GetNodeInfo(name)
4310 elif self.op.kind == constants.TAG_INSTANCE:
4311 name = self.cfg.ExpandInstanceName(self.op.name)
4313 raise errors.OpPrereqError("Invalid instance name (%s)" %
4316 self.target = self.cfg.GetInstanceInfo(name)
4318 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4322 class LUGetTags(TagsLU):
4323 """Returns the tags of a given object.
4326 _OP_REQP = ["kind", "name"]
4328 def Exec(self, feedback_fn):
4329 """Returns the tag list.
4332 return self.target.GetTags()
4335 class LUSearchTags(NoHooksLU):
4336 """Searches the tags for a given pattern.
4339 _OP_REQP = ["pattern"]
4341 def CheckPrereq(self):
4342 """Check prerequisites.
4344 This checks the pattern passed for validity by compiling it.
4348 self.re = re.compile(self.op.pattern)
4349 except re.error, err:
4350 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4351 (self.op.pattern, err))
4353 def Exec(self, feedback_fn):
4354 """Returns the tag list.
4358 tgts = [("/cluster", cfg.GetClusterInfo())]
4359 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4360 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4361 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4362 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4364 for path, target in tgts:
4365 for tag in target.GetTags():
4366 if self.re.search(tag):
4367 results.append((path, tag))
4371 class LUAddTags(TagsLU):
4372 """Sets a tag on a given object.
4375 _OP_REQP = ["kind", "name", "tags"]
4377 def CheckPrereq(self):
4378 """Check prerequisites.
4380 This checks the type and length of the tag name and value.
4383 TagsLU.CheckPrereq(self)
4384 for tag in self.op.tags:
4385 objects.TaggableObject.ValidateTag(tag)
4387 def Exec(self, feedback_fn):
4392 for tag in self.op.tags:
4393 self.target.AddTag(tag)
4394 except errors.TagError, err:
4395 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4397 self.cfg.Update(self.target)
4398 except errors.ConfigurationError:
4399 raise errors.OpRetryError("There has been a modification to the"
4400 " config file and the operation has been"
4401 " aborted. Please retry.")
4404 class LUDelTags(TagsLU):
4405 """Delete a list of tags from a given object.
4408 _OP_REQP = ["kind", "name", "tags"]
4410 def CheckPrereq(self):
4411 """Check prerequisites.
4413 This checks that we have the given tag.
4416 TagsLU.CheckPrereq(self)
4417 for tag in self.op.tags:
4418 objects.TaggableObject.ValidateTag(tag)
4419 del_tags = frozenset(self.op.tags)
4420 cur_tags = self.target.GetTags()
4421 if not del_tags <= cur_tags:
4422 diff_tags = del_tags - cur_tags
4423 diff_names = ["'%s'" % tag for tag in diff_tags]
4425 raise errors.OpPrereqError("Tag(s) %s not found" %
4426 (",".join(diff_names)))
4428 def Exec(self, feedback_fn):
4429 """Remove the tag from the object.
4432 for tag in self.op.tags:
4433 self.target.RemoveTag(tag)
4435 self.cfg.Update(self.target)
4436 except errors.ConfigurationError:
4437 raise errors.OpRetryError("There has been a modification to the"
4438 " config file and the operation has been"
4439 " aborted. Please retry.")
4441 class LUTestDelay(NoHooksLU):
4442 """Sleep for a specified amount of time.
4444 This LU sleeps on the master and/or nodes for a specified amoutn of
4448 _OP_REQP = ["duration", "on_master", "on_nodes"]
4450 def CheckPrereq(self):
4451 """Check prerequisites.
4453 This checks that we have a good list of nodes and/or the duration
4458 if self.op.on_nodes:
4459 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4461 def Exec(self, feedback_fn):
4462 """Do the actual sleep.
4465 if self.op.on_master:
4466 if not utils.TestDelay(self.op.duration):
4467 raise errors.OpExecError("Error during master delay test")
4468 if self.op.on_nodes:
4469 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4471 raise errors.OpExecError("Complete failure from rpc call")
4472 for node, node_result in result.items():
4474 raise errors.OpExecError("Failure during rpc call to node %s,"
4475 " result: %s" % (node, node_result))