4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
35 from ganeti import rpc
36 from ganeti import ssh
37 from ganeti import logger
38 from ganeti import utils
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
47 class LogicalUnit(object):
48 """Logical Unit base class.
50 Subclasses must follow these rules:
51 - implement CheckPrereq which also fills in the opcode instance
52 with all the fields (even if as None)
54 - implement BuildHooksEnv
55 - redefine HPATH and HTYPE
56 - optionally redefine their run requirements (REQ_CLUSTER,
57 REQ_MASTER); note that all commands require root permissions
66 def __init__(self, processor, op, cfg, sstore):
67 """Constructor for LogicalUnit.
69 This needs to be overriden in derived classes in order to check op
73 self.processor = processor
77 for attr_name in self._OP_REQP:
78 attr_val = getattr(op, attr_name, None)
80 raise errors.OpPrereqError("Required parameter '%s' missing" %
83 if not cfg.IsCluster():
84 raise errors.OpPrereqError("Cluster not initialized yet,"
85 " use 'gnt-cluster init' first.")
87 master = sstore.GetMasterNode()
88 if master != socket.gethostname():
89 raise errors.OpPrereqError("Commands must be run on the master"
92 def CheckPrereq(self):
93 """Check prerequisites for this LU.
95 This method should check that the prerequisites for the execution
96 of this LU are fulfilled. It can do internode communication, but
97 it should be idempotent - no cluster or system changes are
100 The method should raise errors.OpPrereqError in case something is
101 not fulfilled. Its return value is ignored.
103 This method should also update all the parameters of the opcode to
104 their canonical form; e.g. a short node name must be fully
105 expanded after this method has successfully completed (so that
106 hooks, logging, etc. work correctly).
109 raise NotImplementedError
111 def Exec(self, feedback_fn):
114 This method should implement the actual work. It should raise
115 errors.OpExecError for failures that are somewhat dealt with in
119 raise NotImplementedError
121 def BuildHooksEnv(self):
122 """Build hooks environment for this LU.
124 This method should return a three-node tuple consisting of: a dict
125 containing the environment that will be used for running the
126 specific hook for this LU, a list of node names on which the hook
127 should run before the execution, and a list of node names on which
128 the hook should run after the execution.
130 The keys of the dict must not have 'GANETI_' prefixed as this will
131 be handled in the hooks runner. Also note additional keys will be
132 added by the hooks runner. If the LU doesn't define any
133 environment, an empty dict (and not None) should be returned.
135 As for the node lists, the master should not be included in the
136 them, as it will be added by the hooks runner in case this LU
137 requires a cluster to run on (otherwise we don't have a node
138 list). No nodes should be returned as an empty list (and not
141 Note that if the HPATH for a LU class is None, this function will
145 raise NotImplementedError
148 class NoHooksLU(LogicalUnit):
149 """Simple LU which runs no hooks.
151 This LU is intended as a parent for other LogicalUnits which will
152 run no hooks, in order to reduce duplicate code.
158 def BuildHooksEnv(self):
161 This is a no-op, since we don't run hooks.
167 def _GetWantedNodes(lu, nodes):
168 """Returns list of checked and expanded nodes.
171 nodes: List of nodes (strings) or None for all
174 if nodes is not None and not isinstance(nodes, list):
175 raise errors.OpPrereqError("Invalid argument type 'nodes'")
181 node = lu.cfg.GetNodeInfo(lu.cfg.ExpandNodeName(name))
183 raise errors.OpPrereqError("No such node name '%s'" % name)
184 wanted_nodes.append(node)
188 return [lu.cfg.GetNodeInfo(name) for name in lu.cfg.GetNodeList()]
191 def _CheckOutputFields(static, dynamic, selected):
192 """Checks whether all selected fields are valid.
195 static: Static fields
196 dynamic: Dynamic fields
199 static_fields = frozenset(static)
200 dynamic_fields = frozenset(dynamic)
202 all_fields = static_fields | dynamic_fields
204 if not all_fields.issuperset(selected):
205 raise errors.OpPrereqError("Unknown output fields selected: %s"
206 % ",".join(frozenset(selected).
207 difference(all_fields)))
210 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
211 memory, vcpus, nics):
212 """Builds instance related env variables for hooks from single variables.
215 secondary_nodes: List of secondary nodes as strings
218 "INSTANCE_NAME": name,
219 "INSTANCE_PRIMARY": primary_node,
220 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
221 "INSTANCE_OS_TYPE": os_type,
222 "INSTANCE_STATUS": status,
223 "INSTANCE_MEMORY": memory,
224 "INSTANCE_VCPUS": vcpus,
228 nic_count = len(nics)
229 for idx, (ip, bridge) in enumerate(nics):
232 env["INSTANCE_NIC%d_IP" % idx] = ip
233 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
237 env["INSTANCE_NIC_COUNT"] = nic_count
242 def _BuildInstanceHookEnvByObject(instance, override=None):
243 """Builds instance related env variables for hooks from an object.
246 instance: objects.Instance object of instance
247 override: dict of values to override
250 'name': instance.name,
251 'primary_node': instance.primary_node,
252 'secondary_nodes': instance.secondary_nodes,
253 'os_type': instance.os,
254 'status': instance.os,
255 'memory': instance.memory,
256 'vcpus': instance.vcpus,
257 'nics': [(nic.ip, nic.bridge) for nic in instance.nics],
260 args.update(override)
261 return _BuildInstanceHookEnv(**args)
264 def _UpdateEtcHosts(fullnode, ip):
265 """Ensure a node has a correct entry in /etc/hosts.
268 fullnode - Fully qualified domain name of host. (str)
269 ip - IPv4 address of host (str)
272 node = fullnode.split(".", 1)[0]
274 f = open('/etc/hosts', 'r+')
283 rawline = f.readline()
289 line = rawline.split('\n')[0]
292 line = line.split('#')[0]
295 # Entire line was comment, skip
296 save_lines.append(rawline)
299 fields = line.split()
303 for spec in [ ip, fullnode, node ]:
304 if spec not in fields:
311 save_lines.append(rawline)
314 if havesome and not haveall:
315 # Line (old, or manual?) which is missing some. Remove.
319 save_lines.append(rawline)
322 add_lines.append('%s\t%s %s\n' % (ip, fullnode, node))
326 save_lines = save_lines + add_lines
328 # We removed a line, write a new file and replace old.
329 fd, tmpname = tempfile.mkstemp('tmp', 'hosts_', '/etc')
330 newfile = os.fdopen(fd, 'w')
331 newfile.write(''.join(save_lines))
333 os.rename(tmpname, '/etc/hosts')
336 # Simply appending a new line will do the trick.
338 for add in add_lines:
344 def _UpdateKnownHosts(fullnode, ip, pubkey):
345 """Ensure a node has a correct known_hosts entry.
348 fullnode - Fully qualified domain name of host. (str)
349 ip - IPv4 address of host (str)
350 pubkey - the public key of the cluster
353 if os.path.exists('/etc/ssh/ssh_known_hosts'):
354 f = open('/etc/ssh/ssh_known_hosts', 'r+')
356 f = open('/etc/ssh/ssh_known_hosts', 'w+')
365 rawline = f.readline()
366 logger.Debug('read %s' % (repr(rawline),))
372 line = rawline.split('\n')[0]
374 parts = line.split(' ')
375 fields = parts[0].split(',')
380 for spec in [ ip, fullnode ]:
381 if spec not in fields:
386 logger.Debug("key, pubkey = %s." % (repr((key, pubkey)),))
387 if haveall and key == pubkey:
389 save_lines.append(rawline)
390 logger.Debug("Keeping known_hosts '%s'." % (repr(rawline),))
393 if havesome and (not haveall or key != pubkey):
395 logger.Debug("Discarding known_hosts '%s'." % (repr(rawline),))
398 save_lines.append(rawline)
401 add_lines.append('%s,%s ssh-rsa %s\n' % (fullnode, ip, pubkey))
402 logger.Debug("Adding known_hosts '%s'." % (repr(add_lines[-1]),))
405 save_lines = save_lines + add_lines
407 # Write a new file and replace old.
408 fd, tmpname = tempfile.mkstemp('tmp', 'ssh_known_hosts_', '/etc/ssh')
409 newfile = os.fdopen(fd, 'w')
410 newfile.write(''.join(save_lines))
412 logger.Debug("Wrote new known_hosts.")
413 os.rename(tmpname, '/etc/ssh/ssh_known_hosts')
416 # Simply appending a new line will do the trick.
418 for add in add_lines:
424 def _HasValidVG(vglist, vgname):
425 """Checks if the volume group list is valid.
427 A non-None return value means there's an error, and the return value
428 is the error message.
431 vgsize = vglist.get(vgname, None)
433 return "volume group '%s' missing" % vgname
435 return ("volume group '%s' too small (20480MiB required, %dMib found)" %
440 def _InitSSHSetup(node):
441 """Setup the SSH configuration for the cluster.
444 This generates a dsa keypair for root, adds the pub key to the
445 permitted hosts and adds the hostkey to its own known hosts.
448 node: the name of this host as a fqdn
451 utils.RemoveFile('/root/.ssh/known_hosts')
453 if os.path.exists('/root/.ssh/id_dsa'):
454 utils.CreateBackup('/root/.ssh/id_dsa')
455 if os.path.exists('/root/.ssh/id_dsa.pub'):
456 utils.CreateBackup('/root/.ssh/id_dsa.pub')
458 utils.RemoveFile('/root/.ssh/id_dsa')
459 utils.RemoveFile('/root/.ssh/id_dsa.pub')
461 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
462 "-f", "/root/.ssh/id_dsa",
465 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
468 f = open('/root/.ssh/id_dsa.pub', 'r')
470 utils.AddAuthorizedKey('/root/.ssh/authorized_keys', f.read(8192))
475 def _InitGanetiServerSetup(ss):
476 """Setup the necessary configuration for the initial node daemon.
478 This creates the nodepass file containing the shared password for
479 the cluster and also generates the SSL certificate.
482 # Create pseudo random password
483 randpass = sha.new(os.urandom(64)).hexdigest()
484 # and write it into sstore
485 ss.SetKey(ss.SS_NODED_PASS, randpass)
487 result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
488 "-days", str(365*5), "-nodes", "-x509",
489 "-keyout", constants.SSL_CERT_FILE,
490 "-out", constants.SSL_CERT_FILE, "-batch"])
492 raise errors.OpExecError("could not generate server ssl cert, command"
493 " %s had exitcode %s and error message %s" %
494 (result.cmd, result.exit_code, result.output))
496 os.chmod(constants.SSL_CERT_FILE, 0400)
498 result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
501 raise errors.OpExecError("Could not start the node daemon, command %s"
502 " had exitcode %s and error %s" %
503 (result.cmd, result.exit_code, result.output))
506 class LUInitCluster(LogicalUnit):
507 """Initialise the cluster.
510 HPATH = "cluster-init"
511 HTYPE = constants.HTYPE_CLUSTER
512 _OP_REQP = ["cluster_name", "hypervisor_type", "vg_name", "mac_prefix",
513 "def_bridge", "master_netdev"]
516 def BuildHooksEnv(self):
519 Notes: Since we don't require a cluster, we must manually add
520 ourselves in the post-run node list.
524 "CLUSTER": self.op.cluster_name,
525 "MASTER": self.hostname['hostname_full'],
527 return env, [], [self.hostname['hostname_full']]
529 def CheckPrereq(self):
530 """Verify that the passed name is a valid one.
533 if config.ConfigWriter.IsCluster():
534 raise errors.OpPrereqError("Cluster is already initialised")
536 hostname_local = socket.gethostname()
537 self.hostname = hostname = utils.LookupHostname(hostname_local)
539 raise errors.OpPrereqError("Cannot resolve my own hostname ('%s')" %
542 self.clustername = clustername = utils.LookupHostname(self.op.cluster_name)
544 raise errors.OpPrereqError("Cannot resolve given cluster name ('%s')"
545 % self.op.cluster_name)
547 result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", hostname['ip']])
549 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
550 " to %s,\nbut this ip address does not"
551 " belong to this host."
552 " Aborting." % hostname['ip'])
554 secondary_ip = getattr(self.op, "secondary_ip", None)
555 if secondary_ip and not utils.IsValidIP(secondary_ip):
556 raise errors.OpPrereqError("Invalid secondary ip given")
557 if secondary_ip and secondary_ip != hostname['ip']:
558 result = utils.RunCmd(["fping", "-S127.0.0.1", "-q", secondary_ip])
560 raise errors.OpPrereqError("You gave %s as secondary IP,\n"
561 "but it does not belong to this host." %
563 self.secondary_ip = secondary_ip
565 # checks presence of the volume group given
566 vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
569 raise errors.OpPrereqError("Error: %s" % vgstatus)
571 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
573 raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
576 if self.op.hypervisor_type not in hypervisor.VALID_HTYPES:
577 raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
578 self.op.hypervisor_type)
580 result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
582 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
583 (self.op.master_netdev,
584 result.output.strip()))
586 def Exec(self, feedback_fn):
587 """Initialize the cluster.
590 clustername = self.clustername
591 hostname = self.hostname
593 # set up the simple store
594 ss = ssconf.SimpleStore()
595 ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
596 ss.SetKey(ss.SS_MASTER_NODE, hostname['hostname_full'])
597 ss.SetKey(ss.SS_MASTER_IP, clustername['ip'])
598 ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
599 ss.SetKey(ss.SS_CLUSTER_NAME, clustername['hostname'])
601 # set up the inter-node password and certificate
602 _InitGanetiServerSetup(ss)
604 # start the master ip
605 rpc.call_node_start_master(hostname['hostname_full'])
607 # set up ssh config and /etc/hosts
608 f = open('/etc/ssh/ssh_host_rsa_key.pub', 'r')
613 sshkey = sshline.split(" ")[1]
615 _UpdateEtcHosts(hostname['hostname_full'],
619 _UpdateKnownHosts(hostname['hostname_full'],
624 _InitSSHSetup(hostname['hostname'])
626 # init of cluster config file
627 cfgw = config.ConfigWriter()
628 cfgw.InitConfig(hostname['hostname'], hostname['ip'], self.secondary_ip,
629 sshkey, self.op.mac_prefix,
630 self.op.vg_name, self.op.def_bridge)
633 class LUDestroyCluster(NoHooksLU):
634 """Logical unit for destroying the cluster.
639 def CheckPrereq(self):
640 """Check prerequisites.
642 This checks whether the cluster is empty.
644 Any errors are signalled by raising errors.OpPrereqError.
647 master = self.sstore.GetMasterNode()
649 nodelist = self.cfg.GetNodeList()
650 if len(nodelist) != 1 or nodelist[0] != master:
651 raise errors.OpPrereqError("There are still %d node(s) in"
652 " this cluster." % (len(nodelist) - 1))
653 instancelist = self.cfg.GetInstanceList()
655 raise errors.OpPrereqError("There are still %d instance(s) in"
656 " this cluster." % len(instancelist))
658 def Exec(self, feedback_fn):
659 """Destroys the cluster.
662 utils.CreateBackup('/root/.ssh/id_dsa')
663 utils.CreateBackup('/root/.ssh/id_dsa.pub')
664 rpc.call_node_leave_cluster(self.sstore.GetMasterNode())
667 class LUVerifyCluster(NoHooksLU):
668 """Verifies the cluster status.
673 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
674 remote_version, feedback_fn):
675 """Run multiple tests against a node.
678 - compares ganeti version
679 - checks vg existance and size > 20G
680 - checks config file checksum
681 - checks ssh to other nodes
684 node: name of the node to check
685 file_list: required list of files
686 local_cksum: dictionary of local files and their checksums
689 # compares ganeti version
690 local_version = constants.PROTOCOL_VERSION
691 if not remote_version:
692 feedback_fn(" - ERROR: connection to %s failed" % (node))
695 if local_version != remote_version:
696 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
697 (local_version, node, remote_version))
700 # checks vg existance and size > 20G
704 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
708 vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
710 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
713 # checks config file checksum
716 if 'filelist' not in node_result:
718 feedback_fn(" - ERROR: node hasn't returned file checksum data")
720 remote_cksum = node_result['filelist']
721 for file_name in file_list:
722 if file_name not in remote_cksum:
724 feedback_fn(" - ERROR: file '%s' missing" % file_name)
725 elif remote_cksum[file_name] != local_cksum[file_name]:
727 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
729 if 'nodelist' not in node_result:
731 feedback_fn(" - ERROR: node hasn't returned node connectivity data")
733 if node_result['nodelist']:
735 for node in node_result['nodelist']:
736 feedback_fn(" - ERROR: communication with node '%s': %s" %
737 (node, node_result['nodelist'][node]))
738 hyp_result = node_result.get('hypervisor', None)
739 if hyp_result is not None:
740 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
743 def _VerifyInstance(self, instance, node_vol_is, node_instance, feedback_fn):
744 """Verify an instance.
746 This function checks to see if the required block devices are
747 available on the instance's node.
752 instancelist = self.cfg.GetInstanceList()
753 if not instance in instancelist:
754 feedback_fn(" - ERROR: instance %s not in instance list %s" %
755 (instance, instancelist))
758 instanceconfig = self.cfg.GetInstanceInfo(instance)
759 node_current = instanceconfig.primary_node
762 instanceconfig.MapLVsByNode(node_vol_should)
764 for node in node_vol_should:
765 for volume in node_vol_should[node]:
766 if node not in node_vol_is or volume not in node_vol_is[node]:
767 feedback_fn(" - ERROR: volume %s missing on node %s" %
771 if not instanceconfig.status == 'down':
772 if not instance in node_instance[node_current]:
773 feedback_fn(" - ERROR: instance %s not running on node %s" %
774 (instance, node_current))
777 for node in node_instance:
778 if (not node == node_current):
779 if instance in node_instance[node]:
780 feedback_fn(" - ERROR: instance %s should not run on node %s" %
786 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
787 """Verify if there are any unknown volumes in the cluster.
789 The .os, .swap and backup volumes are ignored. All other volumes are
795 for node in node_vol_is:
796 for volume in node_vol_is[node]:
797 if node not in node_vol_should or volume not in node_vol_should[node]:
798 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
803 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
804 """Verify the list of running instances.
806 This checks what instances are running but unknown to the cluster.
810 for node in node_instance:
811 for runninginstance in node_instance[node]:
812 if runninginstance not in instancelist:
813 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
814 (runninginstance, node))
818 def CheckPrereq(self):
819 """Check prerequisites.
821 This has no prerequisites.
826 def Exec(self, feedback_fn):
827 """Verify integrity of cluster, performing various test on nodes.
831 feedback_fn("* Verifying global settings")
832 self.cfg.VerifyConfig()
834 master = self.sstore.GetMasterNode()
835 vg_name = self.cfg.GetVGName()
836 nodelist = utils.NiceSort(self.cfg.GetNodeList())
837 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
841 # FIXME: verify OS list
843 file_names = list(self.sstore.GetFileList())
844 file_names.append(constants.SSL_CERT_FILE)
845 file_names.append(constants.CLUSTER_CONF_FILE)
846 local_checksums = utils.FingerprintFiles(file_names)
848 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
849 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
850 all_instanceinfo = rpc.call_instance_list(nodelist)
851 all_vglist = rpc.call_vg_list(nodelist)
852 node_verify_param = {
853 'filelist': file_names,
854 'nodelist': nodelist,
857 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
858 all_rversion = rpc.call_version(nodelist)
860 for node in nodelist:
861 feedback_fn("* Verifying node %s" % node)
862 result = self._VerifyNode(node, file_names, local_checksums,
863 all_vglist[node], all_nvinfo[node],
864 all_rversion[node], feedback_fn)
868 volumeinfo = all_volumeinfo[node]
870 if type(volumeinfo) != dict:
871 feedback_fn(" - ERROR: connection to %s failed" % (node,))
875 node_volume[node] = volumeinfo
878 nodeinstance = all_instanceinfo[node]
879 if type(nodeinstance) != list:
880 feedback_fn(" - ERROR: connection to %s failed" % (node,))
884 node_instance[node] = nodeinstance
888 for instance in instancelist:
889 feedback_fn("* Verifying instance %s" % instance)
890 result = self._VerifyInstance(instance, node_volume, node_instance,
894 inst_config = self.cfg.GetInstanceInfo(instance)
896 inst_config.MapLVsByNode(node_vol_should)
898 feedback_fn("* Verifying orphan volumes")
899 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
903 feedback_fn("* Verifying remaining instances")
904 result = self._VerifyOrphanInstances(instancelist, node_instance,
911 def _WaitForSync(cfgw, instance, oneshot=False, unlock=False):
912 """Sleep and poll for an instance's disk to sync.
915 if not instance.disks:
919 logger.ToStdout("Waiting for instance %s to sync disks." % instance.name)
921 node = instance.primary_node
923 for dev in instance.disks:
924 cfgw.SetDiskID(dev, node)
930 cumul_degraded = False
931 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
933 logger.ToStderr("Can't get any data from node %s" % node)
936 raise errors.RemoteError("Can't contact node %s for mirror data,"
941 for i in range(len(rstats)):
944 logger.ToStderr("Can't compute data for node %s/%s" %
945 (node, instance.disks[i].iv_name))
947 perc_done, est_time, is_degraded = mstat
948 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
949 if perc_done is not None:
951 if est_time is not None:
952 rem_time = "%d estimated seconds remaining" % est_time
955 rem_time = "no time estimate"
956 logger.ToStdout("- device %s: %5.2f%% done, %s" %
957 (instance.disks[i].iv_name, perc_done, rem_time))
964 time.sleep(min(60, max_time))
970 logger.ToStdout("Instance %s's disks are in sync." % instance.name)
971 return not cumul_degraded
974 def _CheckDiskConsistency(cfgw, dev, node, on_primary):
975 """Check that mirrors are not degraded.
978 cfgw.SetDiskID(dev, node)
981 if on_primary or dev.AssembleOnSecondary():
982 rstats = rpc.call_blockdev_find(node, dev)
984 logger.ToStderr("Can't get any data from node %s" % node)
987 result = result and (not rstats[5])
989 for child in dev.children:
990 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
995 class LUDiagnoseOS(NoHooksLU):
996 """Logical unit for OS diagnose/query.
1001 def CheckPrereq(self):
1002 """Check prerequisites.
1004 This always succeeds, since this is a pure query LU.
1009 def Exec(self, feedback_fn):
1010 """Compute the list of OSes.
1013 node_list = self.cfg.GetNodeList()
1014 node_data = rpc.call_os_diagnose(node_list)
1015 if node_data == False:
1016 raise errors.OpExecError("Can't gather the list of OSes")
1020 class LURemoveNode(LogicalUnit):
1021 """Logical unit for removing a node.
1024 HPATH = "node-remove"
1025 HTYPE = constants.HTYPE_NODE
1026 _OP_REQP = ["node_name"]
1028 def BuildHooksEnv(self):
1031 This doesn't run on the target node in the pre phase as a failed
1032 node would not allows itself to run.
1036 "NODE_NAME": self.op.node_name,
1038 all_nodes = self.cfg.GetNodeList()
1039 all_nodes.remove(self.op.node_name)
1040 return env, all_nodes, all_nodes
1042 def CheckPrereq(self):
1043 """Check prerequisites.
1046 - the node exists in the configuration
1047 - it does not have primary or secondary instances
1048 - it's not the master
1050 Any errors are signalled by raising errors.OpPrereqError.
1053 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1055 logger.Error("Error: Node '%s' is unknown." % self.op.node_name)
1058 instance_list = self.cfg.GetInstanceList()
1060 masternode = self.sstore.GetMasterNode()
1061 if node.name == masternode:
1062 raise errors.OpPrereqError("Node is the master node,"
1063 " you need to failover first.")
1065 for instance_name in instance_list:
1066 instance = self.cfg.GetInstanceInfo(instance_name)
1067 if node.name == instance.primary_node:
1068 raise errors.OpPrereqError("Instance %s still running on the node,"
1069 " please remove first." % instance_name)
1070 if node.name in instance.secondary_nodes:
1071 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1072 " please remove first." % instance_name)
1073 self.op.node_name = node.name
1076 def Exec(self, feedback_fn):
1077 """Removes the node from the cluster.
1081 logger.Info("stopping the node daemon and removing configs from node %s" %
1084 rpc.call_node_leave_cluster(node.name)
1086 ssh.SSHCall(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1088 logger.Info("Removing node %s from config" % node.name)
1090 self.cfg.RemoveNode(node.name)
1093 class LUQueryNodes(NoHooksLU):
1094 """Logical unit for querying nodes.
1097 _OP_REQP = ["output_fields"]
1099 def CheckPrereq(self):
1100 """Check prerequisites.
1102 This checks that the fields required are valid output fields.
1105 self.dynamic_fields = frozenset(["dtotal", "dfree",
1106 "mtotal", "mnode", "mfree"])
1108 _CheckOutputFields(static=["name", "pinst", "sinst", "pip", "sip"],
1109 dynamic=self.dynamic_fields,
1110 selected=self.op.output_fields)
1113 def Exec(self, feedback_fn):
1114 """Computes the list of nodes and their attributes.
1117 nodenames = utils.NiceSort(self.cfg.GetNodeList())
1118 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1121 # begin data gathering
1123 if self.dynamic_fields.intersection(self.op.output_fields):
1125 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1126 for name in nodenames:
1127 nodeinfo = node_data.get(name, None)
1130 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1131 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1132 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1133 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1134 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1137 live_data[name] = {}
1139 live_data = dict.fromkeys(nodenames, {})
1141 node_to_primary = dict.fromkeys(nodenames, 0)
1142 node_to_secondary = dict.fromkeys(nodenames, 0)
1144 if "pinst" in self.op.output_fields or "sinst" in self.op.output_fields:
1145 instancelist = self.cfg.GetInstanceList()
1147 for instance in instancelist:
1148 instanceinfo = self.cfg.GetInstanceInfo(instance)
1149 node_to_primary[instanceinfo.primary_node] += 1
1150 for secnode in instanceinfo.secondary_nodes:
1151 node_to_secondary[secnode] += 1
1153 # end data gathering
1156 for node in nodelist:
1158 for field in self.op.output_fields:
1161 elif field == "pinst":
1162 val = node_to_primary[node.name]
1163 elif field == "sinst":
1164 val = node_to_secondary[node.name]
1165 elif field == "pip":
1166 val = node.primary_ip
1167 elif field == "sip":
1168 val = node.secondary_ip
1169 elif field in self.dynamic_fields:
1170 val = live_data[node.name].get(field, "?")
1172 raise errors.ParameterError(field)
1174 node_output.append(val)
1175 output.append(node_output)
1180 class LUQueryNodeVolumes(NoHooksLU):
1181 """Logical unit for getting volumes on node(s).
1184 _OP_REQP = ["nodes", "output_fields"]
1186 def CheckPrereq(self):
1187 """Check prerequisites.
1189 This checks that the fields required are valid output fields.
1192 self.nodes = _GetWantedNodes(self, self.op.nodes)
1194 _CheckOutputFields(static=["node"],
1195 dynamic=["phys", "vg", "name", "size", "instance"],
1196 selected=self.op.output_fields)
1199 def Exec(self, feedback_fn):
1200 """Computes the list of nodes and their attributes.
1203 nodenames = utils.NiceSort([node.name for node in self.nodes])
1204 volumes = rpc.call_node_volumes(nodenames)
1206 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1207 in self.cfg.GetInstanceList()]
1209 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1212 for node in nodenames:
1213 if node not in volumes or not volumes[node]:
1216 node_vols = volumes[node][:]
1217 node_vols.sort(key=lambda vol: vol['dev'])
1219 for vol in node_vols:
1221 for field in self.op.output_fields:
1224 elif field == "phys":
1228 elif field == "name":
1230 elif field == "size":
1231 val = int(float(vol['size']))
1232 elif field == "instance":
1234 if node not in lv_by_node[inst]:
1236 if vol['name'] in lv_by_node[inst][node]:
1242 raise errors.ParameterError(field)
1243 node_output.append(str(val))
1245 output.append(node_output)
1250 class LUAddNode(LogicalUnit):
1251 """Logical unit for adding node to the cluster.
1255 HTYPE = constants.HTYPE_NODE
1256 _OP_REQP = ["node_name"]
1258 def BuildHooksEnv(self):
1261 This will run on all nodes before, and on all nodes + the new node after.
1265 "NODE_NAME": self.op.node_name,
1266 "NODE_PIP": self.op.primary_ip,
1267 "NODE_SIP": self.op.secondary_ip,
1269 nodes_0 = self.cfg.GetNodeList()
1270 nodes_1 = nodes_0 + [self.op.node_name, ]
1271 return env, nodes_0, nodes_1
1273 def CheckPrereq(self):
1274 """Check prerequisites.
1277 - the new node is not already in the config
1279 - its parameters (single/dual homed) matches the cluster
1281 Any errors are signalled by raising errors.OpPrereqError.
1284 node_name = self.op.node_name
1287 dns_data = utils.LookupHostname(node_name)
1289 raise errors.OpPrereqError("Node %s is not resolvable" % node_name)
1291 node = dns_data['hostname']
1292 primary_ip = self.op.primary_ip = dns_data['ip']
1293 secondary_ip = getattr(self.op, "secondary_ip", None)
1294 if secondary_ip is None:
1295 secondary_ip = primary_ip
1296 if not utils.IsValidIP(secondary_ip):
1297 raise errors.OpPrereqError("Invalid secondary IP given")
1298 self.op.secondary_ip = secondary_ip
1299 node_list = cfg.GetNodeList()
1300 if node in node_list:
1301 raise errors.OpPrereqError("Node %s is already in the configuration"
1304 for existing_node_name in node_list:
1305 existing_node = cfg.GetNodeInfo(existing_node_name)
1306 if (existing_node.primary_ip == primary_ip or
1307 existing_node.secondary_ip == primary_ip or
1308 existing_node.primary_ip == secondary_ip or
1309 existing_node.secondary_ip == secondary_ip):
1310 raise errors.OpPrereqError("New node ip address(es) conflict with"
1311 " existing node %s" % existing_node.name)
1313 # check that the type of the node (single versus dual homed) is the
1314 # same as for the master
1315 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1316 master_singlehomed = myself.secondary_ip == myself.primary_ip
1317 newbie_singlehomed = secondary_ip == primary_ip
1318 if master_singlehomed != newbie_singlehomed:
1319 if master_singlehomed:
1320 raise errors.OpPrereqError("The master has no private ip but the"
1321 " new node has one")
1323 raise errors.OpPrereqError("The master has a private ip but the"
1324 " new node doesn't have one")
1326 # checks reachablity
1327 command = ["fping", "-q", primary_ip]
1328 result = utils.RunCmd(command)
1330 raise errors.OpPrereqError("Node not reachable by ping")
1332 if not newbie_singlehomed:
1333 # check reachability from my secondary ip to newbie's secondary ip
1334 command = ["fping", "-S%s" % myself.secondary_ip, "-q", secondary_ip]
1335 result = utils.RunCmd(command)
1337 raise errors.OpPrereqError("Node secondary ip not reachable by ping")
1339 self.new_node = objects.Node(name=node,
1340 primary_ip=primary_ip,
1341 secondary_ip=secondary_ip)
1343 def Exec(self, feedback_fn):
1344 """Adds the new node to the cluster.
1347 new_node = self.new_node
1348 node = new_node.name
1350 # set up inter-node password and certificate and restarts the node daemon
1351 gntpass = self.sstore.GetNodeDaemonPassword()
1352 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1353 raise errors.OpExecError("ganeti password corruption detected")
1354 f = open(constants.SSL_CERT_FILE)
1356 gntpem = f.read(8192)
1359 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1360 # so we use this to detect an invalid certificate; as long as the
1361 # cert doesn't contain this, the here-document will be correctly
1362 # parsed by the shell sequence below
1363 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1364 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1365 if not gntpem.endswith("\n"):
1366 raise errors.OpExecError("PEM must end with newline")
1367 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1369 # remove first the root's known_hosts file
1370 utils.RemoveFile("/root/.ssh/known_hosts")
1371 # and then connect with ssh to set password and start ganeti-noded
1372 # note that all the below variables are sanitized at this point,
1373 # either by being constants or by the checks above
1375 mycommand = ("umask 077 && "
1376 "echo '%s' > '%s' && "
1377 "cat > '%s' << '!EOF.' && \n"
1378 "%s!EOF.\n%s restart" %
1379 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1380 constants.SSL_CERT_FILE, gntpem,
1381 constants.NODE_INITD_SCRIPT))
1383 result = ssh.SSHCall(node, 'root', mycommand, batch=False, ask_key=True)
1385 raise errors.OpExecError("Remote command on node %s, error: %s,"
1387 (node, result.fail_reason, result.output))
1389 # check connectivity
1392 result = rpc.call_version([node])[node]
1394 if constants.PROTOCOL_VERSION == result:
1395 logger.Info("communication to node %s fine, sw version %s match" %
1398 raise errors.OpExecError("Version mismatch master version %s,"
1399 " node version %s" %
1400 (constants.PROTOCOL_VERSION, result))
1402 raise errors.OpExecError("Cannot get version from the new node")
1405 logger.Info("copy ssh key to node %s" % node)
1407 keyfiles = ["/etc/ssh/ssh_host_dsa_key", "/etc/ssh/ssh_host_dsa_key.pub",
1408 "/etc/ssh/ssh_host_rsa_key", "/etc/ssh/ssh_host_rsa_key.pub",
1409 "/root/.ssh/id_dsa", "/root/.ssh/id_dsa.pub"]
1414 keyarray.append(f.read())
1418 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1419 keyarray[3], keyarray[4], keyarray[5])
1422 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1424 # Add node to our /etc/hosts, and add key to known_hosts
1425 _UpdateEtcHosts(new_node.name, new_node.primary_ip)
1426 _UpdateKnownHosts(new_node.name, new_node.primary_ip,
1427 self.cfg.GetHostKey())
1429 if new_node.secondary_ip != new_node.primary_ip:
1430 result = ssh.SSHCall(node, "root",
1431 "fping -S 127.0.0.1 -q %s" % new_node.secondary_ip)
1433 raise errors.OpExecError("Node claims it doesn't have the"
1434 " secondary ip you gave (%s).\n"
1435 "Please fix and re-run this command." %
1436 new_node.secondary_ip)
1438 # Distribute updated /etc/hosts and known_hosts to all nodes,
1439 # including the node just added
1440 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1441 dist_nodes = self.cfg.GetNodeList() + [node]
1442 if myself.name in dist_nodes:
1443 dist_nodes.remove(myself.name)
1445 logger.Debug("Copying hosts and known_hosts to all nodes")
1446 for fname in ("/etc/hosts", "/etc/ssh/ssh_known_hosts"):
1447 result = rpc.call_upload_file(dist_nodes, fname)
1448 for to_node in dist_nodes:
1449 if not result[to_node]:
1450 logger.Error("copy of file %s to node %s failed" %
1453 to_copy = ss.GetFileList()
1454 for fname in to_copy:
1455 if not ssh.CopyFileToNode(node, fname):
1456 logger.Error("could not copy file %s to node %s" % (fname, node))
1458 logger.Info("adding node %s to cluster.conf" % node)
1459 self.cfg.AddNode(new_node)
1462 class LUMasterFailover(LogicalUnit):
1463 """Failover the master node to the current node.
1465 This is a special LU in that it must run on a non-master node.
1468 HPATH = "master-failover"
1469 HTYPE = constants.HTYPE_CLUSTER
1473 def BuildHooksEnv(self):
1476 This will run on the new master only in the pre phase, and on all
1477 the nodes in the post phase.
1481 "NEW_MASTER": self.new_master,
1482 "OLD_MASTER": self.old_master,
1484 return env, [self.new_master], self.cfg.GetNodeList()
1486 def CheckPrereq(self):
1487 """Check prerequisites.
1489 This checks that we are not already the master.
1492 self.new_master = socket.gethostname()
1494 self.old_master = self.sstore.GetMasterNode()
1496 if self.old_master == self.new_master:
1497 raise errors.OpPrereqError("This commands must be run on the node"
1498 " where you want the new master to be.\n"
1499 "%s is already the master" %
1502 def Exec(self, feedback_fn):
1503 """Failover the master node.
1505 This command, when run on a non-master node, will cause the current
1506 master to cease being master, and the non-master to become new
1510 #TODO: do not rely on gethostname returning the FQDN
1511 logger.Info("setting master to %s, old master: %s" %
1512 (self.new_master, self.old_master))
1514 if not rpc.call_node_stop_master(self.old_master):
1515 logger.Error("could disable the master role on the old master"
1516 " %s, please disable manually" % self.old_master)
1519 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1520 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1521 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1522 logger.Error("could not distribute the new simple store master file"
1523 " to the other nodes, please check.")
1525 if not rpc.call_node_start_master(self.new_master):
1526 logger.Error("could not start the master role on the new master"
1527 " %s, please check" % self.new_master)
1528 feedback_fn("Error in activating the master IP on the new master,\n"
1529 "please fix manually.")
1533 class LUQueryClusterInfo(NoHooksLU):
1534 """Query cluster configuration.
1540 def CheckPrereq(self):
1541 """No prerequsites needed for this LU.
1546 def Exec(self, feedback_fn):
1547 """Return cluster config.
1551 "name": self.sstore.GetClusterName(),
1552 "software_version": constants.RELEASE_VERSION,
1553 "protocol_version": constants.PROTOCOL_VERSION,
1554 "config_version": constants.CONFIG_VERSION,
1555 "os_api_version": constants.OS_API_VERSION,
1556 "export_version": constants.EXPORT_VERSION,
1557 "master": self.sstore.GetMasterNode(),
1558 "architecture": (platform.architecture()[0], platform.machine()),
1564 class LUClusterCopyFile(NoHooksLU):
1565 """Copy file to cluster.
1568 _OP_REQP = ["nodes", "filename"]
1570 def CheckPrereq(self):
1571 """Check prerequisites.
1573 It should check that the named file exists and that the given list
1577 if not os.path.exists(self.op.filename):
1578 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1580 self.nodes = _GetWantedNodes(self, self.op.nodes)
1582 def Exec(self, feedback_fn):
1583 """Copy a file from master to some nodes.
1586 opts - class with options as members
1587 args - list containing a single element, the file name
1589 nodes - list containing the name of target nodes; if empty, all nodes
1592 filename = self.op.filename
1594 myname = socket.gethostname()
1596 for node in self.nodes:
1599 if not ssh.CopyFileToNode(node, filename):
1600 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1603 class LUDumpClusterConfig(NoHooksLU):
1604 """Return a text-representation of the cluster-config.
1609 def CheckPrereq(self):
1610 """No prerequisites.
1615 def Exec(self, feedback_fn):
1616 """Dump a representation of the cluster config to the standard output.
1619 return self.cfg.DumpConfig()
1622 class LURunClusterCommand(NoHooksLU):
1623 """Run a command on some nodes.
1626 _OP_REQP = ["command", "nodes"]
1628 def CheckPrereq(self):
1629 """Check prerequisites.
1631 It checks that the given list of nodes is valid.
1634 self.nodes = _GetWantedNodes(self, self.op.nodes)
1636 def Exec(self, feedback_fn):
1637 """Run a command on some nodes.
1641 for node in self.nodes:
1642 result = utils.RunCmd(["ssh", node.name, self.op.command])
1643 data.append((node.name, result.cmd, result.output, result.exit_code))
1648 class LUActivateInstanceDisks(NoHooksLU):
1649 """Bring up an instance's disks.
1652 _OP_REQP = ["instance_name"]
1654 def CheckPrereq(self):
1655 """Check prerequisites.
1657 This checks that the instance is in the cluster.
1660 instance = self.cfg.GetInstanceInfo(
1661 self.cfg.ExpandInstanceName(self.op.instance_name))
1662 if instance is None:
1663 raise errors.OpPrereqError("Instance '%s' not known" %
1664 self.op.instance_name)
1665 self.instance = instance
1668 def Exec(self, feedback_fn):
1669 """Activate the disks.
1672 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1674 raise errors.OpExecError("Cannot activate block devices")
1679 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1680 """Prepare the block devices for an instance.
1682 This sets up the block devices on all nodes.
1685 instance: a ganeti.objects.Instance object
1686 ignore_secondaries: if true, errors on secondary nodes won't result
1687 in an error return from the function
1690 false if the operation failed
1691 list of (host, instance_visible_name, node_visible_name) if the operation
1692 suceeded with the mapping from node devices to instance devices
1696 for inst_disk in instance.disks:
1697 master_result = None
1698 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1699 cfg.SetDiskID(node_disk, node)
1700 is_primary = node == instance.primary_node
1701 result = rpc.call_blockdev_assemble(node, node_disk, is_primary)
1703 logger.Error("could not prepare block device %s on node %s (is_pri"
1704 "mary=%s)" % (inst_disk.iv_name, node, is_primary))
1705 if is_primary or not ignore_secondaries:
1708 master_result = result
1709 device_info.append((instance.primary_node, inst_disk.iv_name,
1712 return disks_ok, device_info
1715 def _StartInstanceDisks(cfg, instance, force):
1716 """Start the disks of an instance.
1719 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1720 ignore_secondaries=force)
1722 _ShutdownInstanceDisks(instance, cfg)
1723 if force is not None and not force:
1724 logger.Error("If the message above refers to a secondary node,"
1725 " you can retry the operation using '--force'.")
1726 raise errors.OpExecError("Disk consistency error")
1729 class LUDeactivateInstanceDisks(NoHooksLU):
1730 """Shutdown an instance's disks.
1733 _OP_REQP = ["instance_name"]
1735 def CheckPrereq(self):
1736 """Check prerequisites.
1738 This checks that the instance is in the cluster.
1741 instance = self.cfg.GetInstanceInfo(
1742 self.cfg.ExpandInstanceName(self.op.instance_name))
1743 if instance is None:
1744 raise errors.OpPrereqError("Instance '%s' not known" %
1745 self.op.instance_name)
1746 self.instance = instance
1748 def Exec(self, feedback_fn):
1749 """Deactivate the disks
1752 instance = self.instance
1753 ins_l = rpc.call_instance_list([instance.primary_node])
1754 ins_l = ins_l[instance.primary_node]
1755 if not type(ins_l) is list:
1756 raise errors.OpExecError("Can't contact node '%s'" %
1757 instance.primary_node)
1759 if self.instance.name in ins_l:
1760 raise errors.OpExecError("Instance is running, can't shutdown"
1763 _ShutdownInstanceDisks(instance, self.cfg)
1766 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1767 """Shutdown block devices of an instance.
1769 This does the shutdown on all nodes of the instance.
1771 If the ignore_primary is false, errors on the primary node are
1776 for disk in instance.disks:
1777 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1778 cfg.SetDiskID(top_disk, node)
1779 if not rpc.call_blockdev_shutdown(node, top_disk):
1780 logger.Error("could not shutdown block device %s on node %s" %
1781 (disk.iv_name, node))
1782 if not ignore_primary or node != instance.primary_node:
1787 class LUStartupInstance(LogicalUnit):
1788 """Starts an instance.
1791 HPATH = "instance-start"
1792 HTYPE = constants.HTYPE_INSTANCE
1793 _OP_REQP = ["instance_name", "force"]
1795 def BuildHooksEnv(self):
1798 This runs on master, primary and secondary nodes of the instance.
1802 "FORCE": self.op.force,
1804 env.update(_BuildInstanceHookEnvByObject(self.instance))
1805 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1806 list(self.instance.secondary_nodes))
1809 def CheckPrereq(self):
1810 """Check prerequisites.
1812 This checks that the instance is in the cluster.
1815 instance = self.cfg.GetInstanceInfo(
1816 self.cfg.ExpandInstanceName(self.op.instance_name))
1817 if instance is None:
1818 raise errors.OpPrereqError("Instance '%s' not known" %
1819 self.op.instance_name)
1821 # check bridges existance
1822 brlist = [nic.bridge for nic in instance.nics]
1823 if not rpc.call_bridges_exist(instance.primary_node, brlist):
1824 raise errors.OpPrereqError("one or more target bridges %s does not"
1825 " exist on destination node '%s'" %
1826 (brlist, instance.primary_node))
1828 self.instance = instance
1829 self.op.instance_name = instance.name
1831 def Exec(self, feedback_fn):
1832 """Start the instance.
1835 instance = self.instance
1836 force = self.op.force
1837 extra_args = getattr(self.op, "extra_args", "")
1839 node_current = instance.primary_node
1841 nodeinfo = rpc.call_node_info([node_current], self.cfg.GetVGName())
1843 raise errors.OpExecError("Could not contact node %s for infos" %
1846 freememory = nodeinfo[node_current]['memory_free']
1847 memory = instance.memory
1848 if memory > freememory:
1849 raise errors.OpExecError("Not enough memory to start instance"
1851 " needed %s MiB, available %s MiB" %
1852 (instance.name, node_current, memory,
1855 _StartInstanceDisks(self.cfg, instance, force)
1857 if not rpc.call_instance_start(node_current, instance, extra_args):
1858 _ShutdownInstanceDisks(instance, self.cfg)
1859 raise errors.OpExecError("Could not start instance")
1861 self.cfg.MarkInstanceUp(instance.name)
1864 class LUShutdownInstance(LogicalUnit):
1865 """Shutdown an instance.
1868 HPATH = "instance-stop"
1869 HTYPE = constants.HTYPE_INSTANCE
1870 _OP_REQP = ["instance_name"]
1872 def BuildHooksEnv(self):
1875 This runs on master, primary and secondary nodes of the instance.
1878 env = _BuildInstanceHookEnvByObject(self.instance)
1879 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1880 list(self.instance.secondary_nodes))
1883 def CheckPrereq(self):
1884 """Check prerequisites.
1886 This checks that the instance is in the cluster.
1889 instance = self.cfg.GetInstanceInfo(
1890 self.cfg.ExpandInstanceName(self.op.instance_name))
1891 if instance is None:
1892 raise errors.OpPrereqError("Instance '%s' not known" %
1893 self.op.instance_name)
1894 self.instance = instance
1896 def Exec(self, feedback_fn):
1897 """Shutdown the instance.
1900 instance = self.instance
1901 node_current = instance.primary_node
1902 if not rpc.call_instance_shutdown(node_current, instance):
1903 logger.Error("could not shutdown instance")
1905 self.cfg.MarkInstanceDown(instance.name)
1906 _ShutdownInstanceDisks(instance, self.cfg)
1909 class LUReinstallInstance(LogicalUnit):
1910 """Reinstall an instance.
1913 HPATH = "instance-reinstall"
1914 HTYPE = constants.HTYPE_INSTANCE
1915 _OP_REQP = ["instance_name"]
1917 def BuildHooksEnv(self):
1920 This runs on master, primary and secondary nodes of the instance.
1923 env = _BuildInstanceHookEnvByObject(self.instance)
1924 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1925 list(self.instance.secondary_nodes))
1928 def CheckPrereq(self):
1929 """Check prerequisites.
1931 This checks that the instance is in the cluster and is not running.
1934 instance = self.cfg.GetInstanceInfo(
1935 self.cfg.ExpandInstanceName(self.op.instance_name))
1936 if instance is None:
1937 raise errors.OpPrereqError("Instance '%s' not known" %
1938 self.op.instance_name)
1939 if instance.disk_template == constants.DT_DISKLESS:
1940 raise errors.OpPrereqError("Instance '%s' has no disks" %
1941 self.op.instance_name)
1942 if instance.status != "down":
1943 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
1944 self.op.instance_name)
1945 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
1947 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
1948 (self.op.instance_name,
1949 instance.primary_node))
1951 self.op.os_type = getattr(self.op, "os_type", None)
1952 if self.op.os_type is not None:
1954 pnode = self.cfg.GetNodeInfo(
1955 self.cfg.ExpandNodeName(instance.primary_node))
1957 raise errors.OpPrereqError("Primary node '%s' is unknown" %
1959 os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
1960 if not isinstance(os_obj, objects.OS):
1961 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
1962 " primary node" % self.op.os_type)
1964 self.instance = instance
1966 def Exec(self, feedback_fn):
1967 """Reinstall the instance.
1970 inst = self.instance
1972 if self.op.os_type is not None:
1973 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
1974 inst.os = self.op.os_type
1975 self.cfg.AddInstance(inst)
1977 _StartInstanceDisks(self.cfg, inst, None)
1979 feedback_fn("Running the instance OS create scripts...")
1980 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
1981 raise errors.OpExecError("Could not install OS for instance %s "
1983 (inst.name, inst.primary_node))
1985 _ShutdownInstanceDisks(inst, self.cfg)
1988 class LURemoveInstance(LogicalUnit):
1989 """Remove an instance.
1992 HPATH = "instance-remove"
1993 HTYPE = constants.HTYPE_INSTANCE
1994 _OP_REQP = ["instance_name"]
1996 def BuildHooksEnv(self):
1999 This runs on master, primary and secondary nodes of the instance.
2002 env = _BuildInstanceHookEnvByObject(self.instance)
2003 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2004 list(self.instance.secondary_nodes))
2007 def CheckPrereq(self):
2008 """Check prerequisites.
2010 This checks that the instance is in the cluster.
2013 instance = self.cfg.GetInstanceInfo(
2014 self.cfg.ExpandInstanceName(self.op.instance_name))
2015 if instance is None:
2016 raise errors.OpPrereqError("Instance '%s' not known" %
2017 self.op.instance_name)
2018 self.instance = instance
2020 def Exec(self, feedback_fn):
2021 """Remove the instance.
2024 instance = self.instance
2025 logger.Info("shutting down instance %s on node %s" %
2026 (instance.name, instance.primary_node))
2028 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2029 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2030 (instance.name, instance.primary_node))
2032 logger.Info("removing block devices for instance %s" % instance.name)
2034 _RemoveDisks(instance, self.cfg)
2036 logger.Info("removing instance %s out of cluster config" % instance.name)
2038 self.cfg.RemoveInstance(instance.name)
2041 class LUQueryInstances(NoHooksLU):
2042 """Logical unit for querying instances.
2045 _OP_REQP = ["output_fields"]
2047 def CheckPrereq(self):
2048 """Check prerequisites.
2050 This checks that the fields required are valid output fields.
2053 self.dynamic_fields = frozenset(["oper_state", "oper_ram"])
2054 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2055 "admin_state", "admin_ram",
2056 "disk_template", "ip", "mac", "bridge"],
2057 dynamic=self.dynamic_fields,
2058 selected=self.op.output_fields)
2060 def Exec(self, feedback_fn):
2061 """Computes the list of nodes and their attributes.
2064 instance_names = utils.NiceSort(self.cfg.GetInstanceList())
2065 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2068 # begin data gathering
2070 nodes = frozenset([inst.primary_node for inst in instance_list])
2073 if self.dynamic_fields.intersection(self.op.output_fields):
2075 node_data = rpc.call_all_instances_info(nodes)
2077 result = node_data[name]
2079 live_data.update(result)
2080 elif result == False:
2081 bad_nodes.append(name)
2082 # else no instance is alive
2084 live_data = dict([(name, {}) for name in instance_names])
2086 # end data gathering
2089 for instance in instance_list:
2091 for field in self.op.output_fields:
2096 elif field == "pnode":
2097 val = instance.primary_node
2098 elif field == "snodes":
2099 val = ",".join(instance.secondary_nodes) or "-"
2100 elif field == "admin_state":
2101 if instance.status == "down":
2105 elif field == "oper_state":
2106 if instance.primary_node in bad_nodes:
2109 if live_data.get(instance.name):
2113 elif field == "admin_ram":
2114 val = instance.memory
2115 elif field == "oper_ram":
2116 if instance.primary_node in bad_nodes:
2118 elif instance.name in live_data:
2119 val = live_data[instance.name].get("memory", "?")
2122 elif field == "disk_template":
2123 val = instance.disk_template
2125 val = instance.nics[0].ip
2126 elif field == "bridge":
2127 val = instance.nics[0].bridge
2128 elif field == "mac":
2129 val = instance.nics[0].mac
2131 raise errors.ParameterError(field)
2139 class LUFailoverInstance(LogicalUnit):
2140 """Failover an instance.
2143 HPATH = "instance-failover"
2144 HTYPE = constants.HTYPE_INSTANCE
2145 _OP_REQP = ["instance_name", "ignore_consistency"]
2147 def BuildHooksEnv(self):
2150 This runs on master, primary and secondary nodes of the instance.
2154 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2156 env.update(_BuildInstanceHookEnvByObject(self.instance))
2157 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2160 def CheckPrereq(self):
2161 """Check prerequisites.
2163 This checks that the instance is in the cluster.
2166 instance = self.cfg.GetInstanceInfo(
2167 self.cfg.ExpandInstanceName(self.op.instance_name))
2168 if instance is None:
2169 raise errors.OpPrereqError("Instance '%s' not known" %
2170 self.op.instance_name)
2172 # check memory requirements on the secondary node
2173 target_node = instance.secondary_nodes[0]
2174 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2175 info = nodeinfo.get(target_node, None)
2177 raise errors.OpPrereqError("Cannot get current information"
2178 " from node '%s'" % nodeinfo)
2179 if instance.memory > info['memory_free']:
2180 raise errors.OpPrereqError("Not enough memory on target node %s."
2181 " %d MB available, %d MB required" %
2182 (target_node, info['memory_free'],
2185 # check bridge existance
2186 brlist = [nic.bridge for nic in instance.nics]
2187 if not rpc.call_bridges_exist(instance.primary_node, brlist):
2188 raise errors.OpPrereqError("One or more target bridges %s does not"
2189 " exist on destination node '%s'" %
2190 (brlist, instance.primary_node))
2192 self.instance = instance
2194 def Exec(self, feedback_fn):
2195 """Failover an instance.
2197 The failover is done by shutting it down on its present node and
2198 starting it on the secondary.
2201 instance = self.instance
2203 source_node = instance.primary_node
2204 target_node = instance.secondary_nodes[0]
2206 feedback_fn("* checking disk consistency between source and target")
2207 for dev in instance.disks:
2208 # for remote_raid1, these are md over drbd
2209 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2210 if not self.op.ignore_consistency:
2211 raise errors.OpExecError("Disk %s is degraded on target node,"
2212 " aborting failover." % dev.iv_name)
2214 feedback_fn("* checking target node resource availability")
2215 nodeinfo = rpc.call_node_info([target_node], self.cfg.GetVGName())
2218 raise errors.OpExecError("Could not contact target node %s." %
2221 free_memory = int(nodeinfo[target_node]['memory_free'])
2222 memory = instance.memory
2223 if memory > free_memory:
2224 raise errors.OpExecError("Not enough memory to create instance %s on"
2225 " node %s. needed %s MiB, available %s MiB" %
2226 (instance.name, target_node, memory,
2229 feedback_fn("* shutting down instance on source node")
2230 logger.Info("Shutting down instance %s on node %s" %
2231 (instance.name, source_node))
2233 if not rpc.call_instance_shutdown(source_node, instance):
2234 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2235 " anyway. Please make sure node %s is down" %
2236 (instance.name, source_node, source_node))
2238 feedback_fn("* deactivating the instance's disks on source node")
2239 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2240 raise errors.OpExecError("Can't shut down the instance's disks.")
2242 instance.primary_node = target_node
2243 # distribute new instance config to the other nodes
2244 self.cfg.AddInstance(instance)
2246 feedback_fn("* activating the instance's disks on target node")
2247 logger.Info("Starting instance %s on node %s" %
2248 (instance.name, target_node))
2250 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2251 ignore_secondaries=True)
2253 _ShutdownInstanceDisks(instance, self.cfg)
2254 raise errors.OpExecError("Can't activate the instance's disks")
2256 feedback_fn("* starting the instance on the target node")
2257 if not rpc.call_instance_start(target_node, instance, None):
2258 _ShutdownInstanceDisks(instance, self.cfg)
2259 raise errors.OpExecError("Could not start instance %s on node %s." %
2260 (instance.name, target_node))
2263 def _CreateBlockDevOnPrimary(cfg, node, device, info):
2264 """Create a tree of block devices on the primary node.
2266 This always creates all devices.
2270 for child in device.children:
2271 if not _CreateBlockDevOnPrimary(cfg, node, child, info):
2274 cfg.SetDiskID(device, node)
2275 new_id = rpc.call_blockdev_create(node, device, device.size, True, info)
2278 if device.physical_id is None:
2279 device.physical_id = new_id
2283 def _CreateBlockDevOnSecondary(cfg, node, device, force, info):
2284 """Create a tree of block devices on a secondary node.
2286 If this device type has to be created on secondaries, create it and
2289 If not, just recurse to children keeping the same 'force' value.
2292 if device.CreateOnSecondary():
2295 for child in device.children:
2296 if not _CreateBlockDevOnSecondary(cfg, node, child, force, info):
2301 cfg.SetDiskID(device, node)
2302 new_id = rpc.call_blockdev_create(node, device, device.size, False, info)
2305 if device.physical_id is None:
2306 device.physical_id = new_id
2310 def _GenerateUniqueNames(cfg, exts):
2311 """Generate a suitable LV name.
2313 This will generate a logical volume name for the given instance.
2318 new_id = cfg.GenerateUniqueID()
2319 results.append("%s%s" % (new_id, val))
2323 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2324 """Generate a drbd device complete with its children.
2327 port = cfg.AllocatePort()
2328 vgname = cfg.GetVGName()
2329 dev_data = objects.Disk(dev_type="lvm", size=size,
2330 logical_id=(vgname, names[0]))
2331 dev_meta = objects.Disk(dev_type="lvm", size=128,
2332 logical_id=(vgname, names[1]))
2333 drbd_dev = objects.Disk(dev_type="drbd", size=size,
2334 logical_id = (primary, secondary, port),
2335 children = [dev_data, dev_meta])
2339 def _GenerateDiskTemplate(cfg, template_name,
2340 instance_name, primary_node,
2341 secondary_nodes, disk_sz, swap_sz):
2342 """Generate the entire disk layout for a given template type.
2345 #TODO: compute space requirements
2347 vgname = cfg.GetVGName()
2348 if template_name == "diskless":
2350 elif template_name == "plain":
2351 if len(secondary_nodes) != 0:
2352 raise errors.ProgrammerError("Wrong template configuration")
2354 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2355 sda_dev = objects.Disk(dev_type="lvm", size=disk_sz,
2356 logical_id=(vgname, names[0]),
2358 sdb_dev = objects.Disk(dev_type="lvm", size=swap_sz,
2359 logical_id=(vgname, names[1]),
2361 disks = [sda_dev, sdb_dev]
2362 elif template_name == "local_raid1":
2363 if len(secondary_nodes) != 0:
2364 raise errors.ProgrammerError("Wrong template configuration")
2367 names = _GenerateUniqueNames(cfg, [".sda_m1", ".sda_m2",
2368 ".sdb_m1", ".sdb_m2"])
2369 sda_dev_m1 = objects.Disk(dev_type="lvm", size=disk_sz,
2370 logical_id=(vgname, names[0]))
2371 sda_dev_m2 = objects.Disk(dev_type="lvm", size=disk_sz,
2372 logical_id=(vgname, names[1]))
2373 md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name = "sda",
2375 children = [sda_dev_m1, sda_dev_m2])
2376 sdb_dev_m1 = objects.Disk(dev_type="lvm", size=swap_sz,
2377 logical_id=(vgname, names[2]))
2378 sdb_dev_m2 = objects.Disk(dev_type="lvm", size=swap_sz,
2379 logical_id=(vgname, names[3]))
2380 md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name = "sdb",
2382 children = [sdb_dev_m1, sdb_dev_m2])
2383 disks = [md_sda_dev, md_sdb_dev]
2384 elif template_name == "remote_raid1":
2385 if len(secondary_nodes) != 1:
2386 raise errors.ProgrammerError("Wrong template configuration")
2387 remote_node = secondary_nodes[0]
2388 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2389 ".sdb_data", ".sdb_meta"])
2390 drbd_sda_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2391 disk_sz, names[0:2])
2392 md_sda_dev = objects.Disk(dev_type="md_raid1", iv_name="sda",
2393 children = [drbd_sda_dev], size=disk_sz)
2394 drbd_sdb_dev = _GenerateMDDRBDBranch(cfg, primary_node, remote_node,
2395 swap_sz, names[2:4])
2396 md_sdb_dev = objects.Disk(dev_type="md_raid1", iv_name="sdb",
2397 children = [drbd_sdb_dev], size=swap_sz)
2398 disks = [md_sda_dev, md_sdb_dev]
2400 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2404 def _GetInstanceInfoText(instance):
2405 """Compute that text that should be added to the disk's metadata.
2408 return "originstname+%s" % instance.name
2411 def _CreateDisks(cfg, instance):
2412 """Create all disks for an instance.
2414 This abstracts away some work from AddInstance.
2417 instance: the instance object
2420 True or False showing the success of the creation process
2423 info = _GetInstanceInfoText(instance)
2425 for device in instance.disks:
2426 logger.Info("creating volume %s for instance %s" %
2427 (device.iv_name, instance.name))
2429 for secondary_node in instance.secondary_nodes:
2430 if not _CreateBlockDevOnSecondary(cfg, secondary_node, device, False,
2432 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2433 (device.iv_name, device, secondary_node))
2436 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, device, info):
2437 logger.Error("failed to create volume %s on primary!" %
2443 def _RemoveDisks(instance, cfg):
2444 """Remove all disks for an instance.
2446 This abstracts away some work from `AddInstance()` and
2447 `RemoveInstance()`. Note that in case some of the devices couldn't
2448 be remove, the removal will continue with the other ones (compare
2449 with `_CreateDisks()`).
2452 instance: the instance object
2455 True or False showing the success of the removal proces
2458 logger.Info("removing block devices for instance %s" % instance.name)
2461 for device in instance.disks:
2462 for node, disk in device.ComputeNodeTree(instance.primary_node):
2463 cfg.SetDiskID(disk, node)
2464 if not rpc.call_blockdev_remove(node, disk):
2465 logger.Error("could not remove block device %s on node %s,"
2466 " continuing anyway" %
2467 (device.iv_name, node))
2472 class LUCreateInstance(LogicalUnit):
2473 """Create an instance.
2476 HPATH = "instance-add"
2477 HTYPE = constants.HTYPE_INSTANCE
2478 _OP_REQP = ["instance_name", "mem_size", "disk_size", "pnode",
2479 "disk_template", "swap_size", "mode", "start", "vcpus",
2482 def BuildHooksEnv(self):
2485 This runs on master, primary and secondary nodes of the instance.
2489 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2490 "INSTANCE_DISK_SIZE": self.op.disk_size,
2491 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2492 "INSTANCE_ADD_MODE": self.op.mode,
2494 if self.op.mode == constants.INSTANCE_IMPORT:
2495 env["INSTANCE_SRC_NODE"] = self.op.src_node
2496 env["INSTANCE_SRC_PATH"] = self.op.src_path
2497 env["INSTANCE_SRC_IMAGE"] = self.src_image
2499 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2500 primary_node=self.op.pnode,
2501 secondary_nodes=self.secondaries,
2502 status=self.instance_status,
2503 os_type=self.op.os_type,
2504 memory=self.op.mem_size,
2505 vcpus=self.op.vcpus,
2506 nics=[(self.inst_ip, self.op.bridge)],
2509 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2514 def CheckPrereq(self):
2515 """Check prerequisites.
2518 if self.op.mode not in (constants.INSTANCE_CREATE,
2519 constants.INSTANCE_IMPORT):
2520 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2523 if self.op.mode == constants.INSTANCE_IMPORT:
2524 src_node = getattr(self.op, "src_node", None)
2525 src_path = getattr(self.op, "src_path", None)
2526 if src_node is None or src_path is None:
2527 raise errors.OpPrereqError("Importing an instance requires source"
2528 " node and path options")
2529 src_node_full = self.cfg.ExpandNodeName(src_node)
2530 if src_node_full is None:
2531 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2532 self.op.src_node = src_node = src_node_full
2534 if not os.path.isabs(src_path):
2535 raise errors.OpPrereqError("The source path must be absolute")
2537 export_info = rpc.call_export_info(src_node, src_path)
2540 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2542 if not export_info.has_section(constants.INISECT_EXP):
2543 raise errors.ProgrammerError("Corrupted export config")
2545 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2546 if (int(ei_version) != constants.EXPORT_VERSION):
2547 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2548 (ei_version, constants.EXPORT_VERSION))
2550 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2551 raise errors.OpPrereqError("Can't import instance with more than"
2554 # FIXME: are the old os-es, disk sizes, etc. useful?
2555 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2556 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2558 self.src_image = diskimage
2559 else: # INSTANCE_CREATE
2560 if getattr(self.op, "os_type", None) is None:
2561 raise errors.OpPrereqError("No guest OS specified")
2563 # check primary node
2564 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
2566 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2568 self.op.pnode = pnode.name
2570 self.secondaries = []
2571 # disk template and mirror node verification
2572 if self.op.disk_template not in constants.DISK_TEMPLATES:
2573 raise errors.OpPrereqError("Invalid disk template name")
2575 if self.op.disk_template == constants.DT_REMOTE_RAID1:
2576 if getattr(self.op, "snode", None) is None:
2577 raise errors.OpPrereqError("The 'remote_raid1' disk template needs"
2580 snode_name = self.cfg.ExpandNodeName(self.op.snode)
2581 if snode_name is None:
2582 raise errors.OpPrereqError("Unknown secondary node '%s'" %
2584 elif snode_name == pnode.name:
2585 raise errors.OpPrereqError("The secondary node cannot be"
2586 " the primary node.")
2587 self.secondaries.append(snode_name)
2589 # Check lv size requirements
2590 nodenames = [pnode.name] + self.secondaries
2591 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
2593 # Required free disk space as a function of disk and swap space
2595 constants.DT_DISKLESS: 0,
2596 constants.DT_PLAIN: self.op.disk_size + self.op.swap_size,
2597 constants.DT_LOCAL_RAID1: (self.op.disk_size + self.op.swap_size) * 2,
2598 # 256 MB are added for drbd metadata, 128MB for each drbd device
2599 constants.DT_REMOTE_RAID1: self.op.disk_size + self.op.swap_size + 256,
2602 if self.op.disk_template not in req_size_dict:
2603 raise errors.ProgrammerError("Disk template '%s' size requirement"
2604 " is unknown" % self.op.disk_template)
2606 req_size = req_size_dict[self.op.disk_template]
2608 for node in nodenames:
2609 info = nodeinfo.get(node, None)
2611 raise errors.OpPrereqError("Cannot get current information"
2612 " from node '%s'" % nodeinfo)
2613 if req_size > info['vg_free']:
2614 raise errors.OpPrereqError("Not enough disk space on target node %s."
2615 " %d MB available, %d MB required" %
2616 (node, info['vg_free'], req_size))
2619 os_obj = rpc.call_os_get([pnode.name], self.op.os_type)[pnode.name]
2620 if not isinstance(os_obj, objects.OS):
2621 raise errors.OpPrereqError("OS '%s' not in supported os list for"
2622 " primary node" % self.op.os_type)
2624 # instance verification
2625 hostname1 = utils.LookupHostname(self.op.instance_name)
2627 raise errors.OpPrereqError("Instance name '%s' not found in dns" %
2628 self.op.instance_name)
2630 self.op.instance_name = instance_name = hostname1['hostname']
2631 instance_list = self.cfg.GetInstanceList()
2632 if instance_name in instance_list:
2633 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2636 ip = getattr(self.op, "ip", None)
2637 if ip is None or ip.lower() == "none":
2639 elif ip.lower() == "auto":
2640 inst_ip = hostname1['ip']
2642 if not utils.IsValidIP(ip):
2643 raise errors.OpPrereqError("given IP address '%s' doesn't look"
2644 " like a valid IP" % ip)
2646 self.inst_ip = inst_ip
2648 command = ["fping", "-q", hostname1['ip']]
2649 result = utils.RunCmd(command)
2650 if not result.failed:
2651 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2652 (hostname1['ip'], instance_name))
2654 # bridge verification
2655 bridge = getattr(self.op, "bridge", None)
2657 self.op.bridge = self.cfg.GetDefBridge()
2659 self.op.bridge = bridge
2661 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
2662 raise errors.OpPrereqError("target bridge '%s' does not exist on"
2663 " destination node '%s'" %
2664 (self.op.bridge, pnode.name))
2667 self.instance_status = 'up'
2669 self.instance_status = 'down'
2671 def Exec(self, feedback_fn):
2672 """Create and add the instance to the cluster.
2675 instance = self.op.instance_name
2676 pnode_name = self.pnode.name
2678 nic = objects.NIC(bridge=self.op.bridge, mac=self.cfg.GenerateMAC())
2679 if self.inst_ip is not None:
2680 nic.ip = self.inst_ip
2682 disks = _GenerateDiskTemplate(self.cfg,
2683 self.op.disk_template,
2684 instance, pnode_name,
2685 self.secondaries, self.op.disk_size,
2688 iobj = objects.Instance(name=instance, os=self.op.os_type,
2689 primary_node=pnode_name,
2690 memory=self.op.mem_size,
2691 vcpus=self.op.vcpus,
2692 nics=[nic], disks=disks,
2693 disk_template=self.op.disk_template,
2694 status=self.instance_status,
2697 feedback_fn("* creating instance disks...")
2698 if not _CreateDisks(self.cfg, iobj):
2699 _RemoveDisks(iobj, self.cfg)
2700 raise errors.OpExecError("Device creation failed, reverting...")
2702 feedback_fn("adding instance %s to cluster config" % instance)
2704 self.cfg.AddInstance(iobj)
2706 if self.op.wait_for_sync:
2707 disk_abort = not _WaitForSync(self.cfg, iobj)
2708 elif iobj.disk_template == "remote_raid1":
2709 # make sure the disks are not degraded (still sync-ing is ok)
2711 feedback_fn("* checking mirrors status")
2712 disk_abort = not _WaitForSync(self.cfg, iobj, oneshot=True)
2717 _RemoveDisks(iobj, self.cfg)
2718 self.cfg.RemoveInstance(iobj.name)
2719 raise errors.OpExecError("There are some degraded disks for"
2722 feedback_fn("creating os for instance %s on node %s" %
2723 (instance, pnode_name))
2725 if iobj.disk_template != constants.DT_DISKLESS:
2726 if self.op.mode == constants.INSTANCE_CREATE:
2727 feedback_fn("* running the instance OS create scripts...")
2728 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
2729 raise errors.OpExecError("could not add os for instance %s"
2731 (instance, pnode_name))
2733 elif self.op.mode == constants.INSTANCE_IMPORT:
2734 feedback_fn("* running the instance OS import scripts...")
2735 src_node = self.op.src_node
2736 src_image = self.src_image
2737 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
2738 src_node, src_image):
2739 raise errors.OpExecError("Could not import os for instance"
2741 (instance, pnode_name))
2743 # also checked in the prereq part
2744 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
2748 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
2749 feedback_fn("* starting instance...")
2750 if not rpc.call_instance_start(pnode_name, iobj, None):
2751 raise errors.OpExecError("Could not start instance")
2754 class LUConnectConsole(NoHooksLU):
2755 """Connect to an instance's console.
2757 This is somewhat special in that it returns the command line that
2758 you need to run on the master node in order to connect to the
2762 _OP_REQP = ["instance_name"]
2764 def CheckPrereq(self):
2765 """Check prerequisites.
2767 This checks that the instance is in the cluster.
2770 instance = self.cfg.GetInstanceInfo(
2771 self.cfg.ExpandInstanceName(self.op.instance_name))
2772 if instance is None:
2773 raise errors.OpPrereqError("Instance '%s' not known" %
2774 self.op.instance_name)
2775 self.instance = instance
2777 def Exec(self, feedback_fn):
2778 """Connect to the console of an instance
2781 instance = self.instance
2782 node = instance.primary_node
2784 node_insts = rpc.call_instance_list([node])[node]
2785 if node_insts is False:
2786 raise errors.OpExecError("Can't connect to node %s." % node)
2788 if instance.name not in node_insts:
2789 raise errors.OpExecError("Instance %s is not running." % instance.name)
2791 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
2793 hyper = hypervisor.GetHypervisor()
2794 console_cmd = hyper.GetShellCommandForConsole(instance.name)
2795 return node, console_cmd
2798 class LUAddMDDRBDComponent(LogicalUnit):
2799 """Adda new mirror member to an instance's disk.
2802 HPATH = "mirror-add"
2803 HTYPE = constants.HTYPE_INSTANCE
2804 _OP_REQP = ["instance_name", "remote_node", "disk_name"]
2806 def BuildHooksEnv(self):
2809 This runs on the master, the primary and all the secondaries.
2813 "NEW_SECONDARY": self.op.remote_node,
2814 "DISK_NAME": self.op.disk_name,
2816 env.update(_BuildInstanceHookEnvByObject(self.instance))
2817 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
2818 self.op.remote_node,] + list(self.instance.secondary_nodes)
2821 def CheckPrereq(self):
2822 """Check prerequisites.
2824 This checks that the instance is in the cluster.
2827 instance = self.cfg.GetInstanceInfo(
2828 self.cfg.ExpandInstanceName(self.op.instance_name))
2829 if instance is None:
2830 raise errors.OpPrereqError("Instance '%s' not known" %
2831 self.op.instance_name)
2832 self.instance = instance
2834 remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
2835 if remote_node is None:
2836 raise errors.OpPrereqError("Node '%s' not known" % self.op.remote_node)
2837 self.remote_node = remote_node
2839 if remote_node == instance.primary_node:
2840 raise errors.OpPrereqError("The specified node is the primary node of"
2843 if instance.disk_template != constants.DT_REMOTE_RAID1:
2844 raise errors.OpPrereqError("Instance's disk layout is not"
2846 for disk in instance.disks:
2847 if disk.iv_name == self.op.disk_name:
2850 raise errors.OpPrereqError("Can't find this device ('%s') in the"
2851 " instance." % self.op.disk_name)
2852 if len(disk.children) > 1:
2853 raise errors.OpPrereqError("The device already has two slave"
2855 "This would create a 3-disk raid1"
2856 " which we don't allow.")
2859 def Exec(self, feedback_fn):
2860 """Add the mirror component
2864 instance = self.instance
2866 remote_node = self.remote_node
2867 lv_names = [".%s_%s" % (disk.iv_name, suf) for suf in ["data", "meta"]]
2868 names = _GenerateUniqueNames(self.cfg, lv_names)
2869 new_drbd = _GenerateMDDRBDBranch(self.cfg, instance.primary_node,
2870 remote_node, disk.size, names)
2872 logger.Info("adding new mirror component on secondary")
2874 if not _CreateBlockDevOnSecondary(self.cfg, remote_node, new_drbd, False,
2875 _GetInstanceInfoText(instance)):
2876 raise errors.OpExecError("Failed to create new component on secondary"
2877 " node %s" % remote_node)
2879 logger.Info("adding new mirror component on primary")
2881 if not _CreateBlockDevOnPrimary(self.cfg, instance.primary_node, new_drbd,
2882 _GetInstanceInfoText(instance)):
2883 # remove secondary dev
2884 self.cfg.SetDiskID(new_drbd, remote_node)
2885 rpc.call_blockdev_remove(remote_node, new_drbd)
2886 raise errors.OpExecError("Failed to create volume on primary")
2888 # the device exists now
2889 # call the primary node to add the mirror to md
2890 logger.Info("adding new mirror component to md")
2891 if not rpc.call_blockdev_addchild(instance.primary_node,
2893 logger.Error("Can't add mirror compoment to md!")
2894 self.cfg.SetDiskID(new_drbd, remote_node)
2895 if not rpc.call_blockdev_remove(remote_node, new_drbd):
2896 logger.Error("Can't rollback on secondary")
2897 self.cfg.SetDiskID(new_drbd, instance.primary_node)
2898 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
2899 logger.Error("Can't rollback on primary")
2900 raise errors.OpExecError("Can't add mirror component to md array")
2902 disk.children.append(new_drbd)
2904 self.cfg.AddInstance(instance)
2906 _WaitForSync(self.cfg, instance)
2911 class LURemoveMDDRBDComponent(LogicalUnit):
2912 """Remove a component from a remote_raid1 disk.
2915 HPATH = "mirror-remove"
2916 HTYPE = constants.HTYPE_INSTANCE
2917 _OP_REQP = ["instance_name", "disk_name", "disk_id"]
2919 def BuildHooksEnv(self):
2922 This runs on the master, the primary and all the secondaries.
2926 "DISK_NAME": self.op.disk_name,
2927 "DISK_ID": self.op.disk_id,
2928 "OLD_SECONDARY": self.old_secondary,
2930 env.update(_BuildInstanceHookEnvByObject(self.instance))
2931 nl = [self.sstore.GetMasterNode(),
2932 self.instance.primary_node] + list(self.instance.secondary_nodes)
2935 def CheckPrereq(self):
2936 """Check prerequisites.
2938 This checks that the instance is in the cluster.
2941 instance = self.cfg.GetInstanceInfo(
2942 self.cfg.ExpandInstanceName(self.op.instance_name))
2943 if instance is None:
2944 raise errors.OpPrereqError("Instance '%s' not known" %
2945 self.op.instance_name)
2946 self.instance = instance
2948 if instance.disk_template != constants.DT_REMOTE_RAID1:
2949 raise errors.OpPrereqError("Instance's disk layout is not"
2951 for disk in instance.disks:
2952 if disk.iv_name == self.op.disk_name:
2955 raise errors.OpPrereqError("Can't find this device ('%s') in the"
2956 " instance." % self.op.disk_name)
2957 for child in disk.children:
2958 if child.dev_type == "drbd" and child.logical_id[2] == self.op.disk_id:
2961 raise errors.OpPrereqError("Can't find the device with this port.")
2963 if len(disk.children) < 2:
2964 raise errors.OpPrereqError("Cannot remove the last component from"
2968 if self.child.logical_id[0] == instance.primary_node:
2972 self.old_secondary = self.child.logical_id[oid]
2974 def Exec(self, feedback_fn):
2975 """Remove the mirror component
2978 instance = self.instance
2981 logger.Info("remove mirror component")
2982 self.cfg.SetDiskID(disk, instance.primary_node)
2983 if not rpc.call_blockdev_removechild(instance.primary_node,
2985 raise errors.OpExecError("Can't remove child from mirror.")
2987 for node in child.logical_id[:2]:
2988 self.cfg.SetDiskID(child, node)
2989 if not rpc.call_blockdev_remove(node, child):
2990 logger.Error("Warning: failed to remove device from node %s,"
2991 " continuing operation." % node)
2993 disk.children.remove(child)
2994 self.cfg.AddInstance(instance)
2997 class LUReplaceDisks(LogicalUnit):
2998 """Replace the disks of an instance.
3001 HPATH = "mirrors-replace"
3002 HTYPE = constants.HTYPE_INSTANCE
3003 _OP_REQP = ["instance_name"]
3005 def BuildHooksEnv(self):
3008 This runs on the master, the primary and all the secondaries.
3012 "NEW_SECONDARY": self.op.remote_node,
3013 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3015 env.update(_BuildInstanceHookEnvByObject(self.instance))
3016 nl = [self.sstore.GetMasterNode(),
3017 self.instance.primary_node] + list(self.instance.secondary_nodes)
3020 def CheckPrereq(self):
3021 """Check prerequisites.
3023 This checks that the instance is in the cluster.
3026 instance = self.cfg.GetInstanceInfo(
3027 self.cfg.ExpandInstanceName(self.op.instance_name))
3028 if instance is None:
3029 raise errors.OpPrereqError("Instance '%s' not known" %
3030 self.op.instance_name)
3031 self.instance = instance
3033 if instance.disk_template != constants.DT_REMOTE_RAID1:
3034 raise errors.OpPrereqError("Instance's disk layout is not"
3037 if len(instance.secondary_nodes) != 1:
3038 raise errors.OpPrereqError("The instance has a strange layout,"
3039 " expected one secondary but found %d" %
3040 len(instance.secondary_nodes))
3042 remote_node = getattr(self.op, "remote_node", None)
3043 if remote_node is None:
3044 remote_node = instance.secondary_nodes[0]
3046 remote_node = self.cfg.ExpandNodeName(remote_node)
3047 if remote_node is None:
3048 raise errors.OpPrereqError("Node '%s' not known" %
3049 self.op.remote_node)
3050 if remote_node == instance.primary_node:
3051 raise errors.OpPrereqError("The specified node is the primary node of"
3053 self.op.remote_node = remote_node
3055 def Exec(self, feedback_fn):
3056 """Replace the disks of an instance.
3059 instance = self.instance
3062 remote_node = self.op.remote_node
3064 vgname = cfg.GetVGName()
3065 for dev in instance.disks:
3067 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3068 names = _GenerateUniqueNames(cfg, lv_names)
3069 new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
3070 remote_node, size, names)
3071 iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
3072 logger.Info("adding new mirror component on secondary for %s" %
3075 if not _CreateBlockDevOnSecondary(cfg, remote_node, new_drbd, False,
3076 _GetInstanceInfoText(instance)):
3077 raise errors.OpExecError("Failed to create new component on"
3078 " secondary node %s\n"
3079 "Full abort, cleanup manually!" %
3082 logger.Info("adding new mirror component on primary")
3084 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node, new_drbd,
3085 _GetInstanceInfoText(instance)):
3086 # remove secondary dev
3087 cfg.SetDiskID(new_drbd, remote_node)
3088 rpc.call_blockdev_remove(remote_node, new_drbd)
3089 raise errors.OpExecError("Failed to create volume on primary!\n"
3090 "Full abort, cleanup manually!!")
3092 # the device exists now
3093 # call the primary node to add the mirror to md
3094 logger.Info("adding new mirror component to md")
3095 if not rpc.call_blockdev_addchild(instance.primary_node, dev,
3097 logger.Error("Can't add mirror compoment to md!")
3098 cfg.SetDiskID(new_drbd, remote_node)
3099 if not rpc.call_blockdev_remove(remote_node, new_drbd):
3100 logger.Error("Can't rollback on secondary")
3101 cfg.SetDiskID(new_drbd, instance.primary_node)
3102 if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
3103 logger.Error("Can't rollback on primary")
3104 raise errors.OpExecError("Full abort, cleanup manually!!")
3106 dev.children.append(new_drbd)
3107 cfg.AddInstance(instance)
3109 # this can fail as the old devices are degraded and _WaitForSync
3110 # does a combined result over all disks, so we don't check its
3112 _WaitForSync(cfg, instance, unlock=True)
3114 # so check manually all the devices
3115 for name in iv_names:
3116 dev, child, new_drbd = iv_names[name]
3117 cfg.SetDiskID(dev, instance.primary_node)
3118 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3120 raise errors.OpExecError("MD device %s is degraded!" % name)
3121 cfg.SetDiskID(new_drbd, instance.primary_node)
3122 is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
3124 raise errors.OpExecError("New drbd device %s is degraded!" % name)
3126 for name in iv_names:
3127 dev, child, new_drbd = iv_names[name]
3128 logger.Info("remove mirror %s component" % name)
3129 cfg.SetDiskID(dev, instance.primary_node)
3130 if not rpc.call_blockdev_removechild(instance.primary_node,
3132 logger.Error("Can't remove child from mirror, aborting"
3133 " *this device cleanup*.\nYou need to cleanup manually!!")
3136 for node in child.logical_id[:2]:
3137 logger.Info("remove child device on %s" % node)
3138 cfg.SetDiskID(child, node)
3139 if not rpc.call_blockdev_remove(node, child):
3140 logger.Error("Warning: failed to remove device from node %s,"
3141 " continuing operation." % node)
3143 dev.children.remove(child)
3145 cfg.AddInstance(instance)
3148 class LUQueryInstanceData(NoHooksLU):
3149 """Query runtime instance data.
3152 _OP_REQP = ["instances"]
3154 def CheckPrereq(self):
3155 """Check prerequisites.
3157 This only checks the optional instance list against the existing names.
3160 if not isinstance(self.op.instances, list):
3161 raise errors.OpPrereqError("Invalid argument type 'instances'")
3162 if self.op.instances:
3163 self.wanted_instances = []
3164 names = self.op.instances
3166 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3167 if instance is None:
3168 raise errors.OpPrereqError("No such instance name '%s'" % name)
3169 self.wanted_instances.append(instance)
3171 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3172 in self.cfg.GetInstanceList()]
3176 def _ComputeDiskStatus(self, instance, snode, dev):
3177 """Compute block device status.
3180 self.cfg.SetDiskID(dev, instance.primary_node)
3181 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3182 if dev.dev_type == "drbd":
3183 # we change the snode then (otherwise we use the one passed in)
3184 if dev.logical_id[0] == instance.primary_node:
3185 snode = dev.logical_id[1]
3187 snode = dev.logical_id[0]
3190 self.cfg.SetDiskID(dev, snode)
3191 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3196 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3197 for child in dev.children]
3202 "iv_name": dev.iv_name,
3203 "dev_type": dev.dev_type,
3204 "logical_id": dev.logical_id,
3205 "physical_id": dev.physical_id,
3206 "pstatus": dev_pstatus,
3207 "sstatus": dev_sstatus,
3208 "children": dev_children,
3213 def Exec(self, feedback_fn):
3214 """Gather and return data"""
3216 for instance in self.wanted_instances:
3217 remote_info = rpc.call_instance_info(instance.primary_node,
3219 if remote_info and "state" in remote_info:
3222 remote_state = "down"
3223 if instance.status == "down":
3224 config_state = "down"
3228 disks = [self._ComputeDiskStatus(instance, None, device)
3229 for device in instance.disks]
3232 "name": instance.name,
3233 "config_state": config_state,
3234 "run_state": remote_state,
3235 "pnode": instance.primary_node,
3236 "snodes": instance.secondary_nodes,
3238 "memory": instance.memory,
3239 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3243 result[instance.name] = idict
3248 class LUQueryNodeData(NoHooksLU):
3249 """Logical unit for querying node data.
3252 _OP_REQP = ["nodes"]
3254 def CheckPrereq(self):
3255 """Check prerequisites.
3257 This only checks the optional node list against the existing names.
3260 self.wanted_nodes = _GetWantedNodes(self, self.op.nodes)
3262 def Exec(self, feedback_fn):
3263 """Compute and return the list of nodes.
3266 ilist = [self.cfg.GetInstanceInfo(iname) for iname
3267 in self.cfg.GetInstanceList()]
3269 for node in self.wanted_nodes:
3270 result.append((node.name, node.primary_ip, node.secondary_ip,
3271 [inst.name for inst in ilist
3272 if inst.primary_node == node.name],
3273 [inst.name for inst in ilist
3274 if node.name in inst.secondary_nodes],
3279 class LUSetInstanceParms(LogicalUnit):
3280 """Modifies an instances's parameters.
3283 HPATH = "instance-modify"
3284 HTYPE = constants.HTYPE_INSTANCE
3285 _OP_REQP = ["instance_name"]
3287 def BuildHooksEnv(self):
3290 This runs on the master, primary and secondaries.
3295 args['memory'] = self.mem
3297 args['vcpus'] = self.vcpus
3298 if self.do_ip or self.do_bridge:
3302 ip = self.instance.nics[0].ip
3304 bridge = self.bridge
3306 bridge = self.instance.nics[0].bridge
3307 args['nics'] = [(ip, bridge)]
3308 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
3309 nl = [self.sstore.GetMasterNode(),
3310 self.instance.primary_node] + list(self.instance.secondary_nodes)
3313 def CheckPrereq(self):
3314 """Check prerequisites.
3316 This only checks the instance list against the existing names.
3319 self.mem = getattr(self.op, "mem", None)
3320 self.vcpus = getattr(self.op, "vcpus", None)
3321 self.ip = getattr(self.op, "ip", None)
3322 self.bridge = getattr(self.op, "bridge", None)
3323 if [self.mem, self.vcpus, self.ip, self.bridge].count(None) == 4:
3324 raise errors.OpPrereqError("No changes submitted")
3325 if self.mem is not None:
3327 self.mem = int(self.mem)
3328 except ValueError, err:
3329 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
3330 if self.vcpus is not None:
3332 self.vcpus = int(self.vcpus)
3333 except ValueError, err:
3334 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
3335 if self.ip is not None:
3337 if self.ip.lower() == "none":
3340 if not utils.IsValidIP(self.ip):
3341 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
3344 self.do_bridge = (self.bridge is not None)
3346 instance = self.cfg.GetInstanceInfo(
3347 self.cfg.ExpandInstanceName(self.op.instance_name))
3348 if instance is None:
3349 raise errors.OpPrereqError("No such instance name '%s'" %
3350 self.op.instance_name)
3351 self.op.instance_name = instance.name
3352 self.instance = instance
3355 def Exec(self, feedback_fn):
3356 """Modifies an instance.
3358 All parameters take effect only at the next restart of the instance.
3361 instance = self.instance
3363 instance.memory = self.mem
3364 result.append(("mem", self.mem))
3366 instance.vcpus = self.vcpus
3367 result.append(("vcpus", self.vcpus))
3369 instance.nics[0].ip = self.ip
3370 result.append(("ip", self.ip))
3372 instance.nics[0].bridge = self.bridge
3373 result.append(("bridge", self.bridge))
3375 self.cfg.AddInstance(instance)
3380 class LUQueryExports(NoHooksLU):
3381 """Query the exports list
3386 def CheckPrereq(self):
3387 """Check that the nodelist contains only existing nodes.
3390 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
3392 def Exec(self, feedback_fn):
3393 """Compute the list of all the exported system images.
3396 a dictionary with the structure node->(export-list)
3397 where export-list is a list of the instances exported on
3401 return rpc.call_export_list([node.name for node in self.nodes])
3404 class LUExportInstance(LogicalUnit):
3405 """Export an instance to an image in the cluster.
3408 HPATH = "instance-export"
3409 HTYPE = constants.HTYPE_INSTANCE
3410 _OP_REQP = ["instance_name", "target_node", "shutdown"]
3412 def BuildHooksEnv(self):
3415 This will run on the master, primary node and target node.
3419 "EXPORT_NODE": self.op.target_node,
3420 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
3422 env.update(_BuildInstanceHookEnvByObject(self.instance))
3423 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
3424 self.op.target_node]
3427 def CheckPrereq(self):
3428 """Check prerequisites.
3430 This checks that the instance name is a valid one.
3433 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
3434 self.instance = self.cfg.GetInstanceInfo(instance_name)
3435 if self.instance is None:
3436 raise errors.OpPrereqError("Instance '%s' not found" %
3437 self.op.instance_name)
3440 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
3441 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
3443 if self.dst_node is None:
3444 raise errors.OpPrereqError("Destination node '%s' is unknown." %
3445 self.op.target_node)
3446 self.op.target_node = self.dst_node.name
3448 def Exec(self, feedback_fn):
3449 """Export an instance to an image in the cluster.
3452 instance = self.instance
3453 dst_node = self.dst_node
3454 src_node = instance.primary_node
3455 # shutdown the instance, unless requested not to do so
3456 if self.op.shutdown:
3457 op = opcodes.OpShutdownInstance(instance_name=instance.name)
3458 self.processor.ChainOpCode(op, feedback_fn)
3460 vgname = self.cfg.GetVGName()
3465 for disk in instance.disks:
3466 if disk.iv_name == "sda":
3467 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
3468 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
3470 if not new_dev_name:
3471 logger.Error("could not snapshot block device %s on node %s" %
3472 (disk.logical_id[1], src_node))
3474 new_dev = objects.Disk(dev_type="lvm", size=disk.size,
3475 logical_id=(vgname, new_dev_name),
3476 physical_id=(vgname, new_dev_name),
3477 iv_name=disk.iv_name)
3478 snap_disks.append(new_dev)
3481 if self.op.shutdown:
3482 op = opcodes.OpStartupInstance(instance_name=instance.name,
3484 self.processor.ChainOpCode(op, feedback_fn)
3486 # TODO: check for size
3488 for dev in snap_disks:
3489 if not rpc.call_snapshot_export(src_node, dev, dst_node.name,
3491 logger.Error("could not export block device %s from node"
3493 (dev.logical_id[1], src_node, dst_node.name))
3494 if not rpc.call_blockdev_remove(src_node, dev):
3495 logger.Error("could not remove snapshot block device %s from"
3496 " node %s" % (dev.logical_id[1], src_node))
3498 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
3499 logger.Error("could not finalize export for instance %s on node %s" %
3500 (instance.name, dst_node.name))
3502 nodelist = self.cfg.GetNodeList()
3503 nodelist.remove(dst_node.name)
3505 # on one-node clusters nodelist will be empty after the removal
3506 # if we proceed the backup would be removed because OpQueryExports
3507 # substitutes an empty list with the full cluster node list.
3509 op = opcodes.OpQueryExports(nodes=nodelist)
3510 exportlist = self.processor.ChainOpCode(op, feedback_fn)
3511 for node in exportlist:
3512 if instance.name in exportlist[node]:
3513 if not rpc.call_export_remove(node, instance.name):
3514 logger.Error("could not remove older export for instance %s"
3515 " on node %s" % (instance.name, node))
3518 class TagsLU(NoHooksLU):
3521 This is an abstract class which is the parent of all the other tags LUs.
3524 def CheckPrereq(self):
3525 """Check prerequisites.
3528 if self.op.kind == constants.TAG_CLUSTER:
3529 self.target = self.cfg.GetClusterInfo()
3530 elif self.op.kind == constants.TAG_NODE:
3531 name = self.cfg.ExpandNodeName(self.op.name)
3533 raise errors.OpPrereqError("Invalid node name (%s)" %
3536 self.target = self.cfg.GetNodeInfo(name)
3537 elif self.op.kind == constants.TAG_INSTANCE:
3538 name = self.cfg.ExpandInstanceName(name)
3540 raise errors.OpPrereqError("Invalid instance name (%s)" %
3543 self.target = self.cfg.GetInstanceInfo(name)
3545 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
3549 class LUGetTags(TagsLU):
3550 """Returns the tags of a given object.
3553 _OP_REQP = ["kind", "name"]
3555 def Exec(self, feedback_fn):
3556 """Returns the tag list.
3559 return self.target.GetTags()
3562 class LUAddTag(TagsLU):
3563 """Sets a tag on a given object.
3566 _OP_REQP = ["kind", "name", "tag"]
3568 def CheckPrereq(self):
3569 """Check prerequisites.
3571 This checks the type and length of the tag name and value.
3574 TagsLU.CheckPrereq(self)
3575 objects.TaggableObject.ValidateTag(self.op.tag)
3577 def Exec(self, feedback_fn):
3582 self.target.AddTag(self.op.tag)
3583 except errors.TagError, err:
3584 raise errors.OpExecError("Error while setting tag: %s" % str(err))
3586 self.cfg.Update(self.target)
3587 except errors.ConfigurationError:
3588 raise errors.OpRetryError("There has been a modification to the"
3589 " config file and the operation has been"
3590 " aborted. Please retry.")
3593 class LUDelTag(TagsLU):
3594 """Delete a tag from a given object.
3597 _OP_REQP = ["kind", "name", "tag"]
3599 def CheckPrereq(self):
3600 """Check prerequisites.
3602 This checks that we have the given tag.
3605 TagsLU.CheckPrereq(self)
3606 objects.TaggableObject.ValidateTag(self.op.tag)
3607 if self.op.tag not in self.target.GetTags():
3608 raise errors.OpPrereqError("Tag not found")
3610 def Exec(self, feedback_fn):
3611 """Remove the tag from the object.
3614 self.target.RemoveTag(self.op.tag)
3616 self.cfg.Update(self.target)
3617 except errors.ConfigurationError:
3618 raise errors.OpRetryError("There has been a modification to the"
3619 " config file and the operation has been"
3620 " aborted. Please retry.")