4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_MASTER: the LU needs to run on the master node
59 REQ_WSSTORE: the LU needs a writable SimpleStore
60 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
62 Note that all commands require root permissions.
72 def __init__(self, processor, op, cfg, sstore):
73 """Constructor for LogicalUnit.
75 This needs to be overriden in derived classes in order to check op
85 for attr_name in self._OP_REQP:
86 attr_val = getattr(op, attr_name, None)
88 raise errors.OpPrereqError("Required parameter '%s' missing" %
91 if not cfg.IsCluster():
92 raise errors.OpPrereqError("Cluster not initialized yet,"
93 " use 'gnt-cluster init' first.")
95 master = sstore.GetMasterNode()
96 if master != utils.HostInfo().name:
97 raise errors.OpPrereqError("Commands must be run on the master"
101 """Returns the SshRunner object
105 self.__ssh = ssh.SshRunner(self.sstore)
108 ssh = property(fget=__GetSSH)
110 def CheckPrereq(self):
111 """Check prerequisites for this LU.
113 This method should check that the prerequisites for the execution
114 of this LU are fulfilled. It can do internode communication, but
115 it should be idempotent - no cluster or system changes are
118 The method should raise errors.OpPrereqError in case something is
119 not fulfilled. Its return value is ignored.
121 This method should also update all the parameters of the opcode to
122 their canonical form; e.g. a short node name must be fully
123 expanded after this method has successfully completed (so that
124 hooks, logging, etc. work correctly).
127 raise NotImplementedError
129 def Exec(self, feedback_fn):
132 This method should implement the actual work. It should raise
133 errors.OpExecError for failures that are somewhat dealt with in
137 raise NotImplementedError
139 def BuildHooksEnv(self):
140 """Build hooks environment for this LU.
142 This method should return a three-node tuple consisting of: a dict
143 containing the environment that will be used for running the
144 specific hook for this LU, a list of node names on which the hook
145 should run before the execution, and a list of node names on which
146 the hook should run after the execution.
148 The keys of the dict must not have 'GANETI_' prefixed as this will
149 be handled in the hooks runner. Also note additional keys will be
150 added by the hooks runner. If the LU doesn't define any
151 environment, an empty dict (and not None) should be returned.
153 No nodes should be returned as an empty list (and not None).
155 Note that if the HPATH for a LU class is None, this function will
159 raise NotImplementedError
161 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
162 """Notify the LU about the results of its hooks.
164 This method is called every time a hooks phase is executed, and notifies
165 the Logical Unit about the hooks' result. The LU can then use it to alter
166 its result based on the hooks. By default the method does nothing and the
167 previous result is passed back unchanged but any LU can define it if it
168 wants to use the local cluster hook-scripts somehow.
171 phase: the hooks phase that has just been run
172 hooks_results: the results of the multi-node hooks rpc call
173 feedback_fn: function to send feedback back to the caller
174 lu_result: the previous result this LU had, or None in the PRE phase.
180 class NoHooksLU(LogicalUnit):
181 """Simple LU which runs no hooks.
183 This LU is intended as a parent for other LogicalUnits which will
184 run no hooks, in order to reduce duplicate code.
191 def _GetWantedNodes(lu, nodes):
192 """Returns list of checked and expanded node names.
195 nodes: List of nodes (strings) or None for all
198 if not isinstance(nodes, list):
199 raise errors.OpPrereqError("Invalid argument type 'nodes'")
205 node = lu.cfg.ExpandNodeName(name)
207 raise errors.OpPrereqError("No such node name '%s'" % name)
211 wanted = lu.cfg.GetNodeList()
212 return utils.NiceSort(wanted)
215 def _GetWantedInstances(lu, instances):
216 """Returns list of checked and expanded instance names.
219 instances: List of instances (strings) or None for all
222 if not isinstance(instances, list):
223 raise errors.OpPrereqError("Invalid argument type 'instances'")
228 for name in instances:
229 instance = lu.cfg.ExpandInstanceName(name)
231 raise errors.OpPrereqError("No such instance name '%s'" % name)
232 wanted.append(instance)
235 wanted = lu.cfg.GetInstanceList()
236 return utils.NiceSort(wanted)
239 def _CheckOutputFields(static, dynamic, selected):
240 """Checks whether all selected fields are valid.
243 static: Static fields
244 dynamic: Dynamic fields
247 static_fields = frozenset(static)
248 dynamic_fields = frozenset(dynamic)
250 all_fields = static_fields | dynamic_fields
252 if not all_fields.issuperset(selected):
253 raise errors.OpPrereqError("Unknown output fields selected: %s"
254 % ",".join(frozenset(selected).
255 difference(all_fields)))
258 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
259 memory, vcpus, nics):
260 """Builds instance related env variables for hooks from single variables.
263 secondary_nodes: List of secondary nodes as strings
267 "INSTANCE_NAME": name,
268 "INSTANCE_PRIMARY": primary_node,
269 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
270 "INSTANCE_OS_TYPE": os_type,
271 "INSTANCE_STATUS": status,
272 "INSTANCE_MEMORY": memory,
273 "INSTANCE_VCPUS": vcpus,
277 nic_count = len(nics)
278 for idx, (ip, bridge, mac) in enumerate(nics):
281 env["INSTANCE_NIC%d_IP" % idx] = ip
282 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
283 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
287 env["INSTANCE_NIC_COUNT"] = nic_count
292 def _BuildInstanceHookEnvByObject(instance, override=None):
293 """Builds instance related env variables for hooks from an object.
296 instance: objects.Instance object of instance
297 override: dict of values to override
300 'name': instance.name,
301 'primary_node': instance.primary_node,
302 'secondary_nodes': instance.secondary_nodes,
303 'os_type': instance.os,
304 'status': instance.os,
305 'memory': instance.memory,
306 'vcpus': instance.vcpus,
307 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
310 args.update(override)
311 return _BuildInstanceHookEnv(**args)
314 def _CheckInstanceBridgesExist(instance):
315 """Check that the brigdes needed by an instance exist.
318 # check bridges existance
319 brlist = [nic.bridge for nic in instance.nics]
320 if not rpc.call_bridges_exist(instance.primary_node, brlist):
321 raise errors.OpPrereqError("one or more target bridges %s does not"
322 " exist on destination node '%s'" %
323 (brlist, instance.primary_node))
326 class LUDestroyCluster(NoHooksLU):
327 """Logical unit for destroying the cluster.
332 def CheckPrereq(self):
333 """Check prerequisites.
335 This checks whether the cluster is empty.
337 Any errors are signalled by raising errors.OpPrereqError.
340 master = self.sstore.GetMasterNode()
342 nodelist = self.cfg.GetNodeList()
343 if len(nodelist) != 1 or nodelist[0] != master:
344 raise errors.OpPrereqError("There are still %d node(s) in"
345 " this cluster." % (len(nodelist) - 1))
346 instancelist = self.cfg.GetInstanceList()
348 raise errors.OpPrereqError("There are still %d instance(s) in"
349 " this cluster." % len(instancelist))
351 def Exec(self, feedback_fn):
352 """Destroys the cluster.
355 master = self.sstore.GetMasterNode()
356 if not rpc.call_node_stop_master(master):
357 raise errors.OpExecError("Could not disable the master role")
358 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
359 utils.CreateBackup(priv_key)
360 utils.CreateBackup(pub_key)
361 rpc.call_node_leave_cluster(master)
364 class LUVerifyCluster(LogicalUnit):
365 """Verifies the cluster status.
368 HPATH = "cluster-verify"
369 HTYPE = constants.HTYPE_CLUSTER
370 _OP_REQP = ["skip_checks"]
372 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
373 remote_version, feedback_fn):
374 """Run multiple tests against a node.
377 - compares ganeti version
378 - checks vg existance and size > 20G
379 - checks config file checksum
380 - checks ssh to other nodes
383 node: name of the node to check
384 file_list: required list of files
385 local_cksum: dictionary of local files and their checksums
388 # compares ganeti version
389 local_version = constants.PROTOCOL_VERSION
390 if not remote_version:
391 feedback_fn(" - ERROR: connection to %s failed" % (node))
394 if local_version != remote_version:
395 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
396 (local_version, node, remote_version))
399 # checks vg existance and size > 20G
403 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
407 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
408 constants.MIN_VG_SIZE)
410 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
413 # checks config file checksum
416 if 'filelist' not in node_result:
418 feedback_fn(" - ERROR: node hasn't returned file checksum data")
420 remote_cksum = node_result['filelist']
421 for file_name in file_list:
422 if file_name not in remote_cksum:
424 feedback_fn(" - ERROR: file '%s' missing" % file_name)
425 elif remote_cksum[file_name] != local_cksum[file_name]:
427 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
429 if 'nodelist' not in node_result:
431 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
433 if node_result['nodelist']:
435 for node in node_result['nodelist']:
436 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
437 (node, node_result['nodelist'][node]))
438 if 'node-net-test' not in node_result:
440 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
442 if node_result['node-net-test']:
444 nlist = utils.NiceSort(node_result['node-net-test'].keys())
446 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
447 (node, node_result['node-net-test'][node]))
449 hyp_result = node_result.get('hypervisor', None)
450 if hyp_result is not None:
451 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
454 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
455 node_instance, feedback_fn):
456 """Verify an instance.
458 This function checks to see if the required block devices are
459 available on the instance's node.
464 node_current = instanceconfig.primary_node
467 instanceconfig.MapLVsByNode(node_vol_should)
469 for node in node_vol_should:
470 for volume in node_vol_should[node]:
471 if node not in node_vol_is or volume not in node_vol_is[node]:
472 feedback_fn(" - ERROR: volume %s missing on node %s" %
476 if not instanceconfig.status == 'down':
477 if (node_current not in node_instance or
478 not instance in node_instance[node_current]):
479 feedback_fn(" - ERROR: instance %s not running on node %s" %
480 (instance, node_current))
483 for node in node_instance:
484 if (not node == node_current):
485 if instance in node_instance[node]:
486 feedback_fn(" - ERROR: instance %s should not run on node %s" %
492 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
493 """Verify if there are any unknown volumes in the cluster.
495 The .os, .swap and backup volumes are ignored. All other volumes are
501 for node in node_vol_is:
502 for volume in node_vol_is[node]:
503 if node not in node_vol_should or volume not in node_vol_should[node]:
504 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
509 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
510 """Verify the list of running instances.
512 This checks what instances are running but unknown to the cluster.
516 for node in node_instance:
517 for runninginstance in node_instance[node]:
518 if runninginstance not in instancelist:
519 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
520 (runninginstance, node))
524 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
525 """Verify N+1 Memory Resilience.
527 Check that if one single node dies we can still start all the instances it
533 for node, nodeinfo in node_info.iteritems():
534 # This code checks that every node which is now listed as secondary has
535 # enough memory to host all instances it is supposed to should a single
536 # other node in the cluster fail.
537 # FIXME: not ready for failover to an arbitrary node
538 # FIXME: does not support file-backed instances
539 # WARNING: we currently take into account down instances as well as up
540 # ones, considering that even if they're down someone might want to start
541 # them even in the event of a node failure.
542 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
544 for instance in instances:
545 needed_mem += instance_cfg[instance].memory
546 if nodeinfo['mfree'] < needed_mem:
547 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
548 " failovers should node %s fail" % (node, prinode))
552 def CheckPrereq(self):
553 """Check prerequisites.
555 Transform the list of checks we're going to skip into a set and check that
556 all its members are valid.
559 self.skip_set = frozenset(self.op.skip_checks)
560 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
561 raise errors.OpPrereqError("Invalid checks to be skipped specified")
563 def BuildHooksEnv(self):
566 Cluster-Verify hooks just rone in the post phase and their failure makes
567 the output be logged in the verify output and the verification to fail.
570 all_nodes = self.cfg.GetNodeList()
571 # TODO: populate the environment with useful information for verify hooks
573 return env, [], all_nodes
575 def Exec(self, feedback_fn):
576 """Verify integrity of cluster, performing various test on nodes.
580 feedback_fn("* Verifying global settings")
581 for msg in self.cfg.VerifyConfig():
582 feedback_fn(" - ERROR: %s" % msg)
584 vg_name = self.cfg.GetVGName()
585 nodelist = utils.NiceSort(self.cfg.GetNodeList())
586 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
587 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
588 i_non_redundant = [] # Non redundant instances
594 # FIXME: verify OS list
596 file_names = list(self.sstore.GetFileList())
597 file_names.append(constants.SSL_CERT_FILE)
598 file_names.append(constants.CLUSTER_CONF_FILE)
599 local_checksums = utils.FingerprintFiles(file_names)
601 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
602 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
603 all_instanceinfo = rpc.call_instance_list(nodelist)
604 all_vglist = rpc.call_vg_list(nodelist)
605 node_verify_param = {
606 'filelist': file_names,
607 'nodelist': nodelist,
609 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
610 for node in nodeinfo]
612 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
613 all_rversion = rpc.call_version(nodelist)
614 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
616 for node in nodelist:
617 feedback_fn("* Verifying node %s" % node)
618 result = self._VerifyNode(node, file_names, local_checksums,
619 all_vglist[node], all_nvinfo[node],
620 all_rversion[node], feedback_fn)
624 volumeinfo = all_volumeinfo[node]
626 if isinstance(volumeinfo, basestring):
627 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
628 (node, volumeinfo[-400:].encode('string_escape')))
630 node_volume[node] = {}
631 elif not isinstance(volumeinfo, dict):
632 feedback_fn(" - ERROR: connection to %s failed" % (node,))
636 node_volume[node] = volumeinfo
639 nodeinstance = all_instanceinfo[node]
640 if type(nodeinstance) != list:
641 feedback_fn(" - ERROR: connection to %s failed" % (node,))
645 node_instance[node] = nodeinstance
648 nodeinfo = all_ninfo[node]
649 if not isinstance(nodeinfo, dict):
650 feedback_fn(" - ERROR: connection to %s failed" % (node,))
656 "mfree": int(nodeinfo['memory_free']),
657 "dfree": int(nodeinfo['vg_free']),
660 # dictionary holding all instances this node is secondary for,
661 # grouped by their primary node. Each key is a cluster node, and each
662 # value is a list of instances which have the key as primary and the
663 # current node as secondary. this is handy to calculate N+1 memory
664 # availability if you can only failover from a primary to its
666 "sinst-by-pnode": {},
669 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
675 for instance in instancelist:
676 feedback_fn("* Verifying instance %s" % instance)
677 inst_config = self.cfg.GetInstanceInfo(instance)
678 result = self._VerifyInstance(instance, inst_config, node_volume,
679 node_instance, feedback_fn)
682 inst_config.MapLVsByNode(node_vol_should)
684 instance_cfg[instance] = inst_config
686 pnode = inst_config.primary_node
687 if pnode in node_info:
688 node_info[pnode]['pinst'].append(instance)
690 feedback_fn(" - ERROR: instance %s, connection to primary node"
691 " %s failed" % (instance, pnode))
694 # If the instance is non-redundant we cannot survive losing its primary
695 # node, so we are not N+1 compliant. On the other hand we have no disk
696 # templates with more than one secondary so that situation is not well
698 # FIXME: does not support file-backed instances
699 if len(inst_config.secondary_nodes) == 0:
700 i_non_redundant.append(instance)
701 elif len(inst_config.secondary_nodes) > 1:
702 feedback_fn(" - WARNING: multiple secondaries for instance %s"
705 for snode in inst_config.secondary_nodes:
706 if snode in node_info:
707 node_info[snode]['sinst'].append(instance)
708 if pnode not in node_info[snode]['sinst-by-pnode']:
709 node_info[snode]['sinst-by-pnode'][pnode] = []
710 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
712 feedback_fn(" - ERROR: instance %s, connection to secondary node"
713 " %s failed" % (instance, snode))
715 feedback_fn("* Verifying orphan volumes")
716 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
720 feedback_fn("* Verifying remaining instances")
721 result = self._VerifyOrphanInstances(instancelist, node_instance,
725 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
726 feedback_fn("* Verifying N+1 Memory redundancy")
727 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
730 feedback_fn("* Other Notes")
732 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
733 % len(i_non_redundant))
737 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
738 """Analize the post-hooks' result, handle it, and send some
739 nicely-formatted feedback back to the user.
742 phase: the hooks phase that has just been run
743 hooks_results: the results of the multi-node hooks rpc call
744 feedback_fn: function to send feedback back to the caller
745 lu_result: previous Exec result
748 # We only really run POST phase hooks, and are only interested in their results
749 if phase == constants.HOOKS_PHASE_POST:
750 # Used to change hooks' output to proper indentation
751 indent_re = re.compile('^', re.M)
752 feedback_fn("* Hooks Results")
753 if not hooks_results:
754 feedback_fn(" - ERROR: general communication failure")
757 for node_name in hooks_results:
758 show_node_header = True
759 res = hooks_results[node_name]
760 if res is False or not isinstance(res, list):
761 feedback_fn(" Communication failure")
764 for script, hkr, output in res:
765 if hkr == constants.HKR_FAIL:
766 # The node header is only shown once, if there are
767 # failing hooks on that node
769 feedback_fn(" Node %s:" % node_name)
770 show_node_header = False
771 feedback_fn(" ERROR: Script %s failed, output:" % script)
772 output = indent_re.sub(' ', output)
773 feedback_fn("%s" % output)
779 class LUVerifyDisks(NoHooksLU):
780 """Verifies the cluster disks status.
785 def CheckPrereq(self):
786 """Check prerequisites.
788 This has no prerequisites.
793 def Exec(self, feedback_fn):
794 """Verify integrity of cluster disks.
797 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
799 vg_name = self.cfg.GetVGName()
800 nodes = utils.NiceSort(self.cfg.GetNodeList())
801 instances = [self.cfg.GetInstanceInfo(name)
802 for name in self.cfg.GetInstanceList()]
805 for inst in instances:
807 if (inst.status != "up" or
808 inst.disk_template not in constants.DTS_NET_MIRROR):
810 inst.MapLVsByNode(inst_lvs)
811 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
812 for node, vol_list in inst_lvs.iteritems():
814 nv_dict[(node, vol)] = inst
819 node_lvs = rpc.call_volume_list(nodes, vg_name)
826 if isinstance(lvs, basestring):
827 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
829 elif not isinstance(lvs, dict):
830 logger.Info("connection to node %s failed or invalid data returned" %
832 res_nodes.append(node)
835 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
836 inst = nv_dict.pop((node, lv_name), None)
837 if (not lv_online and inst is not None
838 and inst.name not in res_instances):
839 res_instances.append(inst.name)
841 # any leftover items in nv_dict are missing LVs, let's arrange the
843 for key, inst in nv_dict.iteritems():
844 if inst.name not in res_missing:
845 res_missing[inst.name] = []
846 res_missing[inst.name].append(key)
851 class LURenameCluster(LogicalUnit):
852 """Rename the cluster.
855 HPATH = "cluster-rename"
856 HTYPE = constants.HTYPE_CLUSTER
860 def BuildHooksEnv(self):
865 "OP_TARGET": self.sstore.GetClusterName(),
866 "NEW_NAME": self.op.name,
868 mn = self.sstore.GetMasterNode()
869 return env, [mn], [mn]
871 def CheckPrereq(self):
872 """Verify that the passed name is a valid one.
875 hostname = utils.HostInfo(self.op.name)
877 new_name = hostname.name
878 self.ip = new_ip = hostname.ip
879 old_name = self.sstore.GetClusterName()
880 old_ip = self.sstore.GetMasterIP()
881 if new_name == old_name and new_ip == old_ip:
882 raise errors.OpPrereqError("Neither the name nor the IP address of the"
883 " cluster has changed")
885 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
886 raise errors.OpPrereqError("The given cluster IP address (%s) is"
887 " reachable on the network. Aborting." %
890 self.op.name = new_name
892 def Exec(self, feedback_fn):
893 """Rename the cluster.
896 clustername = self.op.name
900 # shutdown the master IP
901 master = ss.GetMasterNode()
902 if not rpc.call_node_stop_master(master):
903 raise errors.OpExecError("Could not disable the master role")
907 ss.SetKey(ss.SS_MASTER_IP, ip)
908 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
910 # Distribute updated ss config to all nodes
911 myself = self.cfg.GetNodeInfo(master)
912 dist_nodes = self.cfg.GetNodeList()
913 if myself.name in dist_nodes:
914 dist_nodes.remove(myself.name)
916 logger.Debug("Copying updated ssconf data to all nodes")
917 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
918 fname = ss.KeyToFilename(keyname)
919 result = rpc.call_upload_file(dist_nodes, fname)
920 for to_node in dist_nodes:
921 if not result[to_node]:
922 logger.Error("copy of file %s to node %s failed" %
925 if not rpc.call_node_start_master(master):
926 logger.Error("Could not re-enable the master role on the master,"
927 " please restart manually.")
930 def _RecursiveCheckIfLVMBased(disk):
931 """Check if the given disk or its children are lvm-based.
934 disk: ganeti.objects.Disk object
937 boolean indicating whether a LD_LV dev_type was found or not
941 for chdisk in disk.children:
942 if _RecursiveCheckIfLVMBased(chdisk):
944 return disk.dev_type == constants.LD_LV
947 class LUSetClusterParams(LogicalUnit):
948 """Change the parameters of the cluster.
951 HPATH = "cluster-modify"
952 HTYPE = constants.HTYPE_CLUSTER
955 def BuildHooksEnv(self):
960 "OP_TARGET": self.sstore.GetClusterName(),
961 "NEW_VG_NAME": self.op.vg_name,
963 mn = self.sstore.GetMasterNode()
964 return env, [mn], [mn]
966 def CheckPrereq(self):
967 """Check prerequisites.
969 This checks whether the given params don't conflict and
970 if the given volume group is valid.
973 if not self.op.vg_name:
974 instances = [self.cfg.GetInstanceInfo(name)
975 for name in self.cfg.GetInstanceList()]
976 for inst in instances:
977 for disk in inst.disks:
978 if _RecursiveCheckIfLVMBased(disk):
979 raise errors.OpPrereqError("Cannot disable lvm storage while"
980 " lvm-based instances exist")
982 # if vg_name not None, checks given volume group on all nodes
984 node_list = self.cfg.GetNodeList()
985 vglist = rpc.call_vg_list(node_list)
986 for node in node_list:
987 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
988 constants.MIN_VG_SIZE)
990 raise errors.OpPrereqError("Error on node '%s': %s" %
993 def Exec(self, feedback_fn):
994 """Change the parameters of the cluster.
997 if self.op.vg_name != self.cfg.GetVGName():
998 self.cfg.SetVGName(self.op.vg_name)
1000 feedback_fn("Cluster LVM configuration already in desired"
1001 " state, not changing")
1004 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1005 """Sleep and poll for an instance's disk to sync.
1008 if not instance.disks:
1012 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1014 node = instance.primary_node
1016 for dev in instance.disks:
1017 cfgw.SetDiskID(dev, node)
1023 cumul_degraded = False
1024 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1026 proc.LogWarning("Can't get any data from node %s" % node)
1029 raise errors.RemoteError("Can't contact node %s for mirror data,"
1030 " aborting." % node)
1034 for i in range(len(rstats)):
1037 proc.LogWarning("Can't compute data for node %s/%s" %
1038 (node, instance.disks[i].iv_name))
1040 # we ignore the ldisk parameter
1041 perc_done, est_time, is_degraded, _ = mstat
1042 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1043 if perc_done is not None:
1045 if est_time is not None:
1046 rem_time = "%d estimated seconds remaining" % est_time
1049 rem_time = "no time estimate"
1050 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1051 (instance.disks[i].iv_name, perc_done, rem_time))
1056 #utils.Unlock('cmd')
1059 time.sleep(min(60, max_time))
1066 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1067 return not cumul_degraded
1070 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1071 """Check that mirrors are not degraded.
1073 The ldisk parameter, if True, will change the test from the
1074 is_degraded attribute (which represents overall non-ok status for
1075 the device(s)) to the ldisk (representing the local storage status).
1078 cfgw.SetDiskID(dev, node)
1085 if on_primary or dev.AssembleOnSecondary():
1086 rstats = rpc.call_blockdev_find(node, dev)
1088 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1091 result = result and (not rstats[idx])
1093 for child in dev.children:
1094 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1099 class LUDiagnoseOS(NoHooksLU):
1100 """Logical unit for OS diagnose/query.
1103 _OP_REQP = ["output_fields", "names"]
1105 def CheckPrereq(self):
1106 """Check prerequisites.
1108 This always succeeds, since this is a pure query LU.
1112 raise errors.OpPrereqError("Selective OS query not supported")
1114 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1115 _CheckOutputFields(static=[],
1116 dynamic=self.dynamic_fields,
1117 selected=self.op.output_fields)
1120 def _DiagnoseByOS(node_list, rlist):
1121 """Remaps a per-node return list into an a per-os per-node dictionary
1124 node_list: a list with the names of all nodes
1125 rlist: a map with node names as keys and OS objects as values
1128 map: a map with osnames as keys and as value another map, with
1130 keys and list of OS objects as values
1131 e.g. {"debian-etch": {"node1": [<object>,...],
1132 "node2": [<object>,]}
1137 for node_name, nr in rlist.iteritems():
1141 if os_obj.name not in all_os:
1142 # build a list of nodes for this os containing empty lists
1143 # for each node in node_list
1144 all_os[os_obj.name] = {}
1145 for nname in node_list:
1146 all_os[os_obj.name][nname] = []
1147 all_os[os_obj.name][node_name].append(os_obj)
1150 def Exec(self, feedback_fn):
1151 """Compute the list of OSes.
1154 node_list = self.cfg.GetNodeList()
1155 node_data = rpc.call_os_diagnose(node_list)
1156 if node_data == False:
1157 raise errors.OpExecError("Can't gather the list of OSes")
1158 pol = self._DiagnoseByOS(node_list, node_data)
1160 for os_name, os_data in pol.iteritems():
1162 for field in self.op.output_fields:
1165 elif field == "valid":
1166 val = utils.all([osl and osl[0] for osl in os_data.values()])
1167 elif field == "node_status":
1169 for node_name, nos_list in os_data.iteritems():
1170 val[node_name] = [(v.status, v.path) for v in nos_list]
1172 raise errors.ParameterError(field)
1179 class LURemoveNode(LogicalUnit):
1180 """Logical unit for removing a node.
1183 HPATH = "node-remove"
1184 HTYPE = constants.HTYPE_NODE
1185 _OP_REQP = ["node_name"]
1187 def BuildHooksEnv(self):
1190 This doesn't run on the target node in the pre phase as a failed
1191 node would then be impossible to remove.
1195 "OP_TARGET": self.op.node_name,
1196 "NODE_NAME": self.op.node_name,
1198 all_nodes = self.cfg.GetNodeList()
1199 all_nodes.remove(self.op.node_name)
1200 return env, all_nodes, all_nodes
1202 def CheckPrereq(self):
1203 """Check prerequisites.
1206 - the node exists in the configuration
1207 - it does not have primary or secondary instances
1208 - it's not the master
1210 Any errors are signalled by raising errors.OpPrereqError.
1213 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1215 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1217 instance_list = self.cfg.GetInstanceList()
1219 masternode = self.sstore.GetMasterNode()
1220 if node.name == masternode:
1221 raise errors.OpPrereqError("Node is the master node,"
1222 " you need to failover first.")
1224 for instance_name in instance_list:
1225 instance = self.cfg.GetInstanceInfo(instance_name)
1226 if node.name == instance.primary_node:
1227 raise errors.OpPrereqError("Instance %s still running on the node,"
1228 " please remove first." % instance_name)
1229 if node.name in instance.secondary_nodes:
1230 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1231 " please remove first." % instance_name)
1232 self.op.node_name = node.name
1235 def Exec(self, feedback_fn):
1236 """Removes the node from the cluster.
1240 logger.Info("stopping the node daemon and removing configs from node %s" %
1243 rpc.call_node_leave_cluster(node.name)
1245 logger.Info("Removing node %s from config" % node.name)
1247 self.cfg.RemoveNode(node.name)
1249 utils.RemoveHostFromEtcHosts(node.name)
1252 class LUQueryNodes(NoHooksLU):
1253 """Logical unit for querying nodes.
1256 _OP_REQP = ["output_fields", "names"]
1258 def CheckPrereq(self):
1259 """Check prerequisites.
1261 This checks that the fields required are valid output fields.
1264 self.dynamic_fields = frozenset([
1266 "mtotal", "mnode", "mfree",
1271 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1272 "pinst_list", "sinst_list",
1273 "pip", "sip", "tags"],
1274 dynamic=self.dynamic_fields,
1275 selected=self.op.output_fields)
1277 self.wanted = _GetWantedNodes(self, self.op.names)
1279 def Exec(self, feedback_fn):
1280 """Computes the list of nodes and their attributes.
1283 nodenames = self.wanted
1284 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1286 # begin data gathering
1288 if self.dynamic_fields.intersection(self.op.output_fields):
1290 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1291 for name in nodenames:
1292 nodeinfo = node_data.get(name, None)
1295 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1296 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1297 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1298 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1299 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1300 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1301 "bootid": nodeinfo['bootid'],
1304 live_data[name] = {}
1306 live_data = dict.fromkeys(nodenames, {})
1308 node_to_primary = dict([(name, set()) for name in nodenames])
1309 node_to_secondary = dict([(name, set()) for name in nodenames])
1311 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1312 "sinst_cnt", "sinst_list"))
1313 if inst_fields & frozenset(self.op.output_fields):
1314 instancelist = self.cfg.GetInstanceList()
1316 for instance_name in instancelist:
1317 inst = self.cfg.GetInstanceInfo(instance_name)
1318 if inst.primary_node in node_to_primary:
1319 node_to_primary[inst.primary_node].add(inst.name)
1320 for secnode in inst.secondary_nodes:
1321 if secnode in node_to_secondary:
1322 node_to_secondary[secnode].add(inst.name)
1324 # end data gathering
1327 for node in nodelist:
1329 for field in self.op.output_fields:
1332 elif field == "pinst_list":
1333 val = list(node_to_primary[node.name])
1334 elif field == "sinst_list":
1335 val = list(node_to_secondary[node.name])
1336 elif field == "pinst_cnt":
1337 val = len(node_to_primary[node.name])
1338 elif field == "sinst_cnt":
1339 val = len(node_to_secondary[node.name])
1340 elif field == "pip":
1341 val = node.primary_ip
1342 elif field == "sip":
1343 val = node.secondary_ip
1344 elif field == "tags":
1345 val = list(node.GetTags())
1346 elif field in self.dynamic_fields:
1347 val = live_data[node.name].get(field, None)
1349 raise errors.ParameterError(field)
1350 node_output.append(val)
1351 output.append(node_output)
1356 class LUQueryNodeVolumes(NoHooksLU):
1357 """Logical unit for getting volumes on node(s).
1360 _OP_REQP = ["nodes", "output_fields"]
1362 def CheckPrereq(self):
1363 """Check prerequisites.
1365 This checks that the fields required are valid output fields.
1368 self.nodes = _GetWantedNodes(self, self.op.nodes)
1370 _CheckOutputFields(static=["node"],
1371 dynamic=["phys", "vg", "name", "size", "instance"],
1372 selected=self.op.output_fields)
1375 def Exec(self, feedback_fn):
1376 """Computes the list of nodes and their attributes.
1379 nodenames = self.nodes
1380 volumes = rpc.call_node_volumes(nodenames)
1382 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1383 in self.cfg.GetInstanceList()]
1385 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1388 for node in nodenames:
1389 if node not in volumes or not volumes[node]:
1392 node_vols = volumes[node][:]
1393 node_vols.sort(key=lambda vol: vol['dev'])
1395 for vol in node_vols:
1397 for field in self.op.output_fields:
1400 elif field == "phys":
1404 elif field == "name":
1406 elif field == "size":
1407 val = int(float(vol['size']))
1408 elif field == "instance":
1410 if node not in lv_by_node[inst]:
1412 if vol['name'] in lv_by_node[inst][node]:
1418 raise errors.ParameterError(field)
1419 node_output.append(str(val))
1421 output.append(node_output)
1426 class LUAddNode(LogicalUnit):
1427 """Logical unit for adding node to the cluster.
1431 HTYPE = constants.HTYPE_NODE
1432 _OP_REQP = ["node_name"]
1434 def BuildHooksEnv(self):
1437 This will run on all nodes before, and on all nodes + the new node after.
1441 "OP_TARGET": self.op.node_name,
1442 "NODE_NAME": self.op.node_name,
1443 "NODE_PIP": self.op.primary_ip,
1444 "NODE_SIP": self.op.secondary_ip,
1446 nodes_0 = self.cfg.GetNodeList()
1447 nodes_1 = nodes_0 + [self.op.node_name, ]
1448 return env, nodes_0, nodes_1
1450 def CheckPrereq(self):
1451 """Check prerequisites.
1454 - the new node is not already in the config
1456 - its parameters (single/dual homed) matches the cluster
1458 Any errors are signalled by raising errors.OpPrereqError.
1461 node_name = self.op.node_name
1464 dns_data = utils.HostInfo(node_name)
1466 node = dns_data.name
1467 primary_ip = self.op.primary_ip = dns_data.ip
1468 secondary_ip = getattr(self.op, "secondary_ip", None)
1469 if secondary_ip is None:
1470 secondary_ip = primary_ip
1471 if not utils.IsValidIP(secondary_ip):
1472 raise errors.OpPrereqError("Invalid secondary IP given")
1473 self.op.secondary_ip = secondary_ip
1475 node_list = cfg.GetNodeList()
1476 if not self.op.readd and node in node_list:
1477 raise errors.OpPrereqError("Node %s is already in the configuration" %
1479 elif self.op.readd and node not in node_list:
1480 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1482 for existing_node_name in node_list:
1483 existing_node = cfg.GetNodeInfo(existing_node_name)
1485 if self.op.readd and node == existing_node_name:
1486 if (existing_node.primary_ip != primary_ip or
1487 existing_node.secondary_ip != secondary_ip):
1488 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1489 " address configuration as before")
1492 if (existing_node.primary_ip == primary_ip or
1493 existing_node.secondary_ip == primary_ip or
1494 existing_node.primary_ip == secondary_ip or
1495 existing_node.secondary_ip == secondary_ip):
1496 raise errors.OpPrereqError("New node ip address(es) conflict with"
1497 " existing node %s" % existing_node.name)
1499 # check that the type of the node (single versus dual homed) is the
1500 # same as for the master
1501 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1502 master_singlehomed = myself.secondary_ip == myself.primary_ip
1503 newbie_singlehomed = secondary_ip == primary_ip
1504 if master_singlehomed != newbie_singlehomed:
1505 if master_singlehomed:
1506 raise errors.OpPrereqError("The master has no private ip but the"
1507 " new node has one")
1509 raise errors.OpPrereqError("The master has a private ip but the"
1510 " new node doesn't have one")
1512 # checks reachablity
1513 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1514 raise errors.OpPrereqError("Node not reachable by ping")
1516 if not newbie_singlehomed:
1517 # check reachability from my secondary ip to newbie's secondary ip
1518 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1519 source=myself.secondary_ip):
1520 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1521 " based ping to noded port")
1523 self.new_node = objects.Node(name=node,
1524 primary_ip=primary_ip,
1525 secondary_ip=secondary_ip)
1527 def Exec(self, feedback_fn):
1528 """Adds the new node to the cluster.
1531 new_node = self.new_node
1532 node = new_node.name
1534 # check connectivity
1535 result = rpc.call_version([node])[node]
1537 if constants.PROTOCOL_VERSION == result:
1538 logger.Info("communication to node %s fine, sw version %s match" %
1541 raise errors.OpExecError("Version mismatch master version %s,"
1542 " node version %s" %
1543 (constants.PROTOCOL_VERSION, result))
1545 raise errors.OpExecError("Cannot get version from the new node")
1548 logger.Info("copy ssh key to node %s" % node)
1549 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1551 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1552 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1558 keyarray.append(f.read())
1562 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1563 keyarray[3], keyarray[4], keyarray[5])
1566 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1568 # Add node to our /etc/hosts, and add key to known_hosts
1569 utils.AddHostToEtcHosts(new_node.name)
1571 if new_node.secondary_ip != new_node.primary_ip:
1572 if not rpc.call_node_tcp_ping(new_node.name,
1573 constants.LOCALHOST_IP_ADDRESS,
1574 new_node.secondary_ip,
1575 constants.DEFAULT_NODED_PORT,
1577 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1578 " you gave (%s). Please fix and re-run this"
1579 " command." % new_node.secondary_ip)
1581 node_verify_list = [self.sstore.GetMasterNode()]
1582 node_verify_param = {
1584 # TODO: do a node-net-test as well?
1587 result = rpc.call_node_verify(node_verify_list, node_verify_param)
1588 for verifier in node_verify_list:
1589 if not result[verifier]:
1590 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1591 " for remote verification" % verifier)
1592 if result[verifier]['nodelist']:
1593 for failed in result[verifier]['nodelist']:
1594 feedback_fn("ssh/hostname verification failed %s -> %s" %
1595 (verifier, result[verifier]['nodelist'][failed]))
1596 raise errors.OpExecError("ssh/hostname verification failed.")
1598 # Distribute updated /etc/hosts and known_hosts to all nodes,
1599 # including the node just added
1600 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1601 dist_nodes = self.cfg.GetNodeList()
1602 if not self.op.readd:
1603 dist_nodes.append(node)
1604 if myself.name in dist_nodes:
1605 dist_nodes.remove(myself.name)
1607 logger.Debug("Copying hosts and known_hosts to all nodes")
1608 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1609 result = rpc.call_upload_file(dist_nodes, fname)
1610 for to_node in dist_nodes:
1611 if not result[to_node]:
1612 logger.Error("copy of file %s to node %s failed" %
1615 to_copy = self.sstore.GetFileList()
1616 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1617 to_copy.append(constants.VNC_PASSWORD_FILE)
1618 for fname in to_copy:
1619 result = rpc.call_upload_file([node], fname)
1620 if not result[node]:
1621 logger.Error("could not copy file %s to node %s" % (fname, node))
1623 if not self.op.readd:
1624 logger.Info("adding node %s to cluster.conf" % node)
1625 self.cfg.AddNode(new_node)
1628 class LUMasterFailover(LogicalUnit):
1629 """Failover the master node to the current node.
1631 This is a special LU in that it must run on a non-master node.
1634 HPATH = "master-failover"
1635 HTYPE = constants.HTYPE_CLUSTER
1640 def BuildHooksEnv(self):
1643 This will run on the new master only in the pre phase, and on all
1644 the nodes in the post phase.
1648 "OP_TARGET": self.new_master,
1649 "NEW_MASTER": self.new_master,
1650 "OLD_MASTER": self.old_master,
1652 return env, [self.new_master], self.cfg.GetNodeList()
1654 def CheckPrereq(self):
1655 """Check prerequisites.
1657 This checks that we are not already the master.
1660 self.new_master = utils.HostInfo().name
1661 self.old_master = self.sstore.GetMasterNode()
1663 if self.old_master == self.new_master:
1664 raise errors.OpPrereqError("This commands must be run on the node"
1665 " where you want the new master to be."
1666 " %s is already the master" %
1669 def Exec(self, feedback_fn):
1670 """Failover the master node.
1672 This command, when run on a non-master node, will cause the current
1673 master to cease being master, and the non-master to become new
1677 #TODO: do not rely on gethostname returning the FQDN
1678 logger.Info("setting master to %s, old master: %s" %
1679 (self.new_master, self.old_master))
1681 if not rpc.call_node_stop_master(self.old_master):
1682 logger.Error("could disable the master role on the old master"
1683 " %s, please disable manually" % self.old_master)
1686 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1687 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1688 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1689 logger.Error("could not distribute the new simple store master file"
1690 " to the other nodes, please check.")
1692 if not rpc.call_node_start_master(self.new_master):
1693 logger.Error("could not start the master role on the new master"
1694 " %s, please check" % self.new_master)
1695 feedback_fn("Error in activating the master IP on the new master,"
1696 " please fix manually.")
1700 class LUQueryClusterInfo(NoHooksLU):
1701 """Query cluster configuration.
1707 def CheckPrereq(self):
1708 """No prerequsites needed for this LU.
1713 def Exec(self, feedback_fn):
1714 """Return cluster config.
1718 "name": self.sstore.GetClusterName(),
1719 "software_version": constants.RELEASE_VERSION,
1720 "protocol_version": constants.PROTOCOL_VERSION,
1721 "config_version": constants.CONFIG_VERSION,
1722 "os_api_version": constants.OS_API_VERSION,
1723 "export_version": constants.EXPORT_VERSION,
1724 "master": self.sstore.GetMasterNode(),
1725 "architecture": (platform.architecture()[0], platform.machine()),
1726 "hypervisor_type": self.sstore.GetHypervisorType(),
1732 class LUDumpClusterConfig(NoHooksLU):
1733 """Return a text-representation of the cluster-config.
1738 def CheckPrereq(self):
1739 """No prerequisites.
1744 def Exec(self, feedback_fn):
1745 """Dump a representation of the cluster config to the standard output.
1748 return self.cfg.DumpConfig()
1751 class LUActivateInstanceDisks(NoHooksLU):
1752 """Bring up an instance's disks.
1755 _OP_REQP = ["instance_name"]
1757 def CheckPrereq(self):
1758 """Check prerequisites.
1760 This checks that the instance is in the cluster.
1763 instance = self.cfg.GetInstanceInfo(
1764 self.cfg.ExpandInstanceName(self.op.instance_name))
1765 if instance is None:
1766 raise errors.OpPrereqError("Instance '%s' not known" %
1767 self.op.instance_name)
1768 self.instance = instance
1771 def Exec(self, feedback_fn):
1772 """Activate the disks.
1775 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1777 raise errors.OpExecError("Cannot activate block devices")
1782 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1783 """Prepare the block devices for an instance.
1785 This sets up the block devices on all nodes.
1788 instance: a ganeti.objects.Instance object
1789 ignore_secondaries: if true, errors on secondary nodes won't result
1790 in an error return from the function
1793 false if the operation failed
1794 list of (host, instance_visible_name, node_visible_name) if the operation
1795 suceeded with the mapping from node devices to instance devices
1799 iname = instance.name
1800 # With the two passes mechanism we try to reduce the window of
1801 # opportunity for the race condition of switching DRBD to primary
1802 # before handshaking occured, but we do not eliminate it
1804 # The proper fix would be to wait (with some limits) until the
1805 # connection has been made and drbd transitions from WFConnection
1806 # into any other network-connected state (Connected, SyncTarget,
1809 # 1st pass, assemble on all nodes in secondary mode
1810 for inst_disk in instance.disks:
1811 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1812 cfg.SetDiskID(node_disk, node)
1813 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1815 logger.Error("could not prepare block device %s on node %s"
1816 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1817 if not ignore_secondaries:
1820 # FIXME: race condition on drbd migration to primary
1822 # 2nd pass, do only the primary node
1823 for inst_disk in instance.disks:
1824 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1825 if node != instance.primary_node:
1827 cfg.SetDiskID(node_disk, node)
1828 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1830 logger.Error("could not prepare block device %s on node %s"
1831 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1833 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1835 # leave the disks configured for the primary node
1836 # this is a workaround that would be fixed better by
1837 # improving the logical/physical id handling
1838 for disk in instance.disks:
1839 cfg.SetDiskID(disk, instance.primary_node)
1841 return disks_ok, device_info
1844 def _StartInstanceDisks(cfg, instance, force):
1845 """Start the disks of an instance.
1848 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1849 ignore_secondaries=force)
1851 _ShutdownInstanceDisks(instance, cfg)
1852 if force is not None and not force:
1853 logger.Error("If the message above refers to a secondary node,"
1854 " you can retry the operation using '--force'.")
1855 raise errors.OpExecError("Disk consistency error")
1858 class LUDeactivateInstanceDisks(NoHooksLU):
1859 """Shutdown an instance's disks.
1862 _OP_REQP = ["instance_name"]
1864 def CheckPrereq(self):
1865 """Check prerequisites.
1867 This checks that the instance is in the cluster.
1870 instance = self.cfg.GetInstanceInfo(
1871 self.cfg.ExpandInstanceName(self.op.instance_name))
1872 if instance is None:
1873 raise errors.OpPrereqError("Instance '%s' not known" %
1874 self.op.instance_name)
1875 self.instance = instance
1877 def Exec(self, feedback_fn):
1878 """Deactivate the disks
1881 instance = self.instance
1882 ins_l = rpc.call_instance_list([instance.primary_node])
1883 ins_l = ins_l[instance.primary_node]
1884 if not type(ins_l) is list:
1885 raise errors.OpExecError("Can't contact node '%s'" %
1886 instance.primary_node)
1888 if self.instance.name in ins_l:
1889 raise errors.OpExecError("Instance is running, can't shutdown"
1892 _ShutdownInstanceDisks(instance, self.cfg)
1895 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1896 """Shutdown block devices of an instance.
1898 This does the shutdown on all nodes of the instance.
1900 If the ignore_primary is false, errors on the primary node are
1905 for disk in instance.disks:
1906 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1907 cfg.SetDiskID(top_disk, node)
1908 if not rpc.call_blockdev_shutdown(node, top_disk):
1909 logger.Error("could not shutdown block device %s on node %s" %
1910 (disk.iv_name, node))
1911 if not ignore_primary or node != instance.primary_node:
1916 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1917 """Checks if a node has enough free memory.
1919 This function check if a given node has the needed amount of free
1920 memory. In case the node has less memory or we cannot get the
1921 information from the node, this function raise an OpPrereqError
1925 - cfg: a ConfigWriter instance
1926 - node: the node name
1927 - reason: string to use in the error message
1928 - requested: the amount of memory in MiB
1931 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1932 if not nodeinfo or not isinstance(nodeinfo, dict):
1933 raise errors.OpPrereqError("Could not contact node %s for resource"
1934 " information" % (node,))
1936 free_mem = nodeinfo[node].get('memory_free')
1937 if not isinstance(free_mem, int):
1938 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1939 " was '%s'" % (node, free_mem))
1940 if requested > free_mem:
1941 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1942 " needed %s MiB, available %s MiB" %
1943 (node, reason, requested, free_mem))
1946 class LUStartupInstance(LogicalUnit):
1947 """Starts an instance.
1950 HPATH = "instance-start"
1951 HTYPE = constants.HTYPE_INSTANCE
1952 _OP_REQP = ["instance_name", "force"]
1954 def BuildHooksEnv(self):
1957 This runs on master, primary and secondary nodes of the instance.
1961 "FORCE": self.op.force,
1963 env.update(_BuildInstanceHookEnvByObject(self.instance))
1964 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
1965 list(self.instance.secondary_nodes))
1968 def CheckPrereq(self):
1969 """Check prerequisites.
1971 This checks that the instance is in the cluster.
1974 instance = self.cfg.GetInstanceInfo(
1975 self.cfg.ExpandInstanceName(self.op.instance_name))
1976 if instance is None:
1977 raise errors.OpPrereqError("Instance '%s' not known" %
1978 self.op.instance_name)
1980 # check bridges existance
1981 _CheckInstanceBridgesExist(instance)
1983 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
1984 "starting instance %s" % instance.name,
1987 self.instance = instance
1988 self.op.instance_name = instance.name
1990 def Exec(self, feedback_fn):
1991 """Start the instance.
1994 instance = self.instance
1995 force = self.op.force
1996 extra_args = getattr(self.op, "extra_args", "")
1998 self.cfg.MarkInstanceUp(instance.name)
2000 node_current = instance.primary_node
2002 _StartInstanceDisks(self.cfg, instance, force)
2004 if not rpc.call_instance_start(node_current, instance, extra_args):
2005 _ShutdownInstanceDisks(instance, self.cfg)
2006 raise errors.OpExecError("Could not start instance")
2009 class LURebootInstance(LogicalUnit):
2010 """Reboot an instance.
2013 HPATH = "instance-reboot"
2014 HTYPE = constants.HTYPE_INSTANCE
2015 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2017 def BuildHooksEnv(self):
2020 This runs on master, primary and secondary nodes of the instance.
2024 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2026 env.update(_BuildInstanceHookEnvByObject(self.instance))
2027 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2028 list(self.instance.secondary_nodes))
2031 def CheckPrereq(self):
2032 """Check prerequisites.
2034 This checks that the instance is in the cluster.
2037 instance = self.cfg.GetInstanceInfo(
2038 self.cfg.ExpandInstanceName(self.op.instance_name))
2039 if instance is None:
2040 raise errors.OpPrereqError("Instance '%s' not known" %
2041 self.op.instance_name)
2043 # check bridges existance
2044 _CheckInstanceBridgesExist(instance)
2046 self.instance = instance
2047 self.op.instance_name = instance.name
2049 def Exec(self, feedback_fn):
2050 """Reboot the instance.
2053 instance = self.instance
2054 ignore_secondaries = self.op.ignore_secondaries
2055 reboot_type = self.op.reboot_type
2056 extra_args = getattr(self.op, "extra_args", "")
2058 node_current = instance.primary_node
2060 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2061 constants.INSTANCE_REBOOT_HARD,
2062 constants.INSTANCE_REBOOT_FULL]:
2063 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2064 (constants.INSTANCE_REBOOT_SOFT,
2065 constants.INSTANCE_REBOOT_HARD,
2066 constants.INSTANCE_REBOOT_FULL))
2068 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2069 constants.INSTANCE_REBOOT_HARD]:
2070 if not rpc.call_instance_reboot(node_current, instance,
2071 reboot_type, extra_args):
2072 raise errors.OpExecError("Could not reboot instance")
2074 if not rpc.call_instance_shutdown(node_current, instance):
2075 raise errors.OpExecError("could not shutdown instance for full reboot")
2076 _ShutdownInstanceDisks(instance, self.cfg)
2077 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2078 if not rpc.call_instance_start(node_current, instance, extra_args):
2079 _ShutdownInstanceDisks(instance, self.cfg)
2080 raise errors.OpExecError("Could not start instance for full reboot")
2082 self.cfg.MarkInstanceUp(instance.name)
2085 class LUShutdownInstance(LogicalUnit):
2086 """Shutdown an instance.
2089 HPATH = "instance-stop"
2090 HTYPE = constants.HTYPE_INSTANCE
2091 _OP_REQP = ["instance_name"]
2093 def BuildHooksEnv(self):
2096 This runs on master, primary and secondary nodes of the instance.
2099 env = _BuildInstanceHookEnvByObject(self.instance)
2100 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2101 list(self.instance.secondary_nodes))
2104 def CheckPrereq(self):
2105 """Check prerequisites.
2107 This checks that the instance is in the cluster.
2110 instance = self.cfg.GetInstanceInfo(
2111 self.cfg.ExpandInstanceName(self.op.instance_name))
2112 if instance is None:
2113 raise errors.OpPrereqError("Instance '%s' not known" %
2114 self.op.instance_name)
2115 self.instance = instance
2117 def Exec(self, feedback_fn):
2118 """Shutdown the instance.
2121 instance = self.instance
2122 node_current = instance.primary_node
2123 self.cfg.MarkInstanceDown(instance.name)
2124 if not rpc.call_instance_shutdown(node_current, instance):
2125 logger.Error("could not shutdown instance")
2127 _ShutdownInstanceDisks(instance, self.cfg)
2130 class LUReinstallInstance(LogicalUnit):
2131 """Reinstall an instance.
2134 HPATH = "instance-reinstall"
2135 HTYPE = constants.HTYPE_INSTANCE
2136 _OP_REQP = ["instance_name"]
2138 def BuildHooksEnv(self):
2141 This runs on master, primary and secondary nodes of the instance.
2144 env = _BuildInstanceHookEnvByObject(self.instance)
2145 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2146 list(self.instance.secondary_nodes))
2149 def CheckPrereq(self):
2150 """Check prerequisites.
2152 This checks that the instance is in the cluster and is not running.
2155 instance = self.cfg.GetInstanceInfo(
2156 self.cfg.ExpandInstanceName(self.op.instance_name))
2157 if instance is None:
2158 raise errors.OpPrereqError("Instance '%s' not known" %
2159 self.op.instance_name)
2160 if instance.disk_template == constants.DT_DISKLESS:
2161 raise errors.OpPrereqError("Instance '%s' has no disks" %
2162 self.op.instance_name)
2163 if instance.status != "down":
2164 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2165 self.op.instance_name)
2166 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2168 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2169 (self.op.instance_name,
2170 instance.primary_node))
2172 self.op.os_type = getattr(self.op, "os_type", None)
2173 if self.op.os_type is not None:
2175 pnode = self.cfg.GetNodeInfo(
2176 self.cfg.ExpandNodeName(instance.primary_node))
2178 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2180 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2182 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2183 " primary node" % self.op.os_type)
2185 self.instance = instance
2187 def Exec(self, feedback_fn):
2188 """Reinstall the instance.
2191 inst = self.instance
2193 if self.op.os_type is not None:
2194 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2195 inst.os = self.op.os_type
2196 self.cfg.AddInstance(inst)
2198 _StartInstanceDisks(self.cfg, inst, None)
2200 feedback_fn("Running the instance OS create scripts...")
2201 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2202 raise errors.OpExecError("Could not install OS for instance %s"
2204 (inst.name, inst.primary_node))
2206 _ShutdownInstanceDisks(inst, self.cfg)
2209 class LURenameInstance(LogicalUnit):
2210 """Rename an instance.
2213 HPATH = "instance-rename"
2214 HTYPE = constants.HTYPE_INSTANCE
2215 _OP_REQP = ["instance_name", "new_name"]
2217 def BuildHooksEnv(self):
2220 This runs on master, primary and secondary nodes of the instance.
2223 env = _BuildInstanceHookEnvByObject(self.instance)
2224 env["INSTANCE_NEW_NAME"] = self.op.new_name
2225 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2226 list(self.instance.secondary_nodes))
2229 def CheckPrereq(self):
2230 """Check prerequisites.
2232 This checks that the instance is in the cluster and is not running.
2235 instance = self.cfg.GetInstanceInfo(
2236 self.cfg.ExpandInstanceName(self.op.instance_name))
2237 if instance is None:
2238 raise errors.OpPrereqError("Instance '%s' not known" %
2239 self.op.instance_name)
2240 if instance.status != "down":
2241 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2242 self.op.instance_name)
2243 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2245 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2246 (self.op.instance_name,
2247 instance.primary_node))
2248 self.instance = instance
2250 # new name verification
2251 name_info = utils.HostInfo(self.op.new_name)
2253 self.op.new_name = new_name = name_info.name
2254 instance_list = self.cfg.GetInstanceList()
2255 if new_name in instance_list:
2256 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2259 if not getattr(self.op, "ignore_ip", False):
2260 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2261 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2262 (name_info.ip, new_name))
2265 def Exec(self, feedback_fn):
2266 """Reinstall the instance.
2269 inst = self.instance
2270 old_name = inst.name
2272 if inst.disk_template == constants.DT_FILE:
2273 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2275 self.cfg.RenameInstance(inst.name, self.op.new_name)
2277 # re-read the instance from the configuration after rename
2278 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2280 if inst.disk_template == constants.DT_FILE:
2281 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2282 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2283 old_file_storage_dir,
2284 new_file_storage_dir)
2287 raise errors.OpExecError("Could not connect to node '%s' to rename"
2288 " directory '%s' to '%s' (but the instance"
2289 " has been renamed in Ganeti)" % (
2290 inst.primary_node, old_file_storage_dir,
2291 new_file_storage_dir))
2294 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2295 " (but the instance has been renamed in"
2296 " Ganeti)" % (old_file_storage_dir,
2297 new_file_storage_dir))
2299 _StartInstanceDisks(self.cfg, inst, None)
2301 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2303 msg = ("Could run OS rename script for instance %s on node %s (but the"
2304 " instance has been renamed in Ganeti)" %
2305 (inst.name, inst.primary_node))
2308 _ShutdownInstanceDisks(inst, self.cfg)
2311 class LURemoveInstance(LogicalUnit):
2312 """Remove an instance.
2315 HPATH = "instance-remove"
2316 HTYPE = constants.HTYPE_INSTANCE
2317 _OP_REQP = ["instance_name", "ignore_failures"]
2319 def BuildHooksEnv(self):
2322 This runs on master, primary and secondary nodes of the instance.
2325 env = _BuildInstanceHookEnvByObject(self.instance)
2326 nl = [self.sstore.GetMasterNode()]
2329 def CheckPrereq(self):
2330 """Check prerequisites.
2332 This checks that the instance is in the cluster.
2335 instance = self.cfg.GetInstanceInfo(
2336 self.cfg.ExpandInstanceName(self.op.instance_name))
2337 if instance is None:
2338 raise errors.OpPrereqError("Instance '%s' not known" %
2339 self.op.instance_name)
2340 self.instance = instance
2342 def Exec(self, feedback_fn):
2343 """Remove the instance.
2346 instance = self.instance
2347 logger.Info("shutting down instance %s on node %s" %
2348 (instance.name, instance.primary_node))
2350 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2351 if self.op.ignore_failures:
2352 feedback_fn("Warning: can't shutdown instance")
2354 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2355 (instance.name, instance.primary_node))
2357 logger.Info("removing block devices for instance %s" % instance.name)
2359 if not _RemoveDisks(instance, self.cfg):
2360 if self.op.ignore_failures:
2361 feedback_fn("Warning: can't remove instance's disks")
2363 raise errors.OpExecError("Can't remove instance's disks")
2365 logger.Info("removing instance %s out of cluster config" % instance.name)
2367 self.cfg.RemoveInstance(instance.name)
2370 class LUQueryInstances(NoHooksLU):
2371 """Logical unit for querying instances.
2374 _OP_REQP = ["output_fields", "names"]
2376 def CheckPrereq(self):
2377 """Check prerequisites.
2379 This checks that the fields required are valid output fields.
2382 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2383 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2384 "admin_state", "admin_ram",
2385 "disk_template", "ip", "mac", "bridge",
2386 "sda_size", "sdb_size", "vcpus", "tags"],
2387 dynamic=self.dynamic_fields,
2388 selected=self.op.output_fields)
2390 self.wanted = _GetWantedInstances(self, self.op.names)
2392 def Exec(self, feedback_fn):
2393 """Computes the list of nodes and their attributes.
2396 instance_names = self.wanted
2397 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2400 # begin data gathering
2402 nodes = frozenset([inst.primary_node for inst in instance_list])
2405 if self.dynamic_fields.intersection(self.op.output_fields):
2407 node_data = rpc.call_all_instances_info(nodes)
2409 result = node_data[name]
2411 live_data.update(result)
2412 elif result == False:
2413 bad_nodes.append(name)
2414 # else no instance is alive
2416 live_data = dict([(name, {}) for name in instance_names])
2418 # end data gathering
2421 for instance in instance_list:
2423 for field in self.op.output_fields:
2428 elif field == "pnode":
2429 val = instance.primary_node
2430 elif field == "snodes":
2431 val = list(instance.secondary_nodes)
2432 elif field == "admin_state":
2433 val = (instance.status != "down")
2434 elif field == "oper_state":
2435 if instance.primary_node in bad_nodes:
2438 val = bool(live_data.get(instance.name))
2439 elif field == "status":
2440 if instance.primary_node in bad_nodes:
2441 val = "ERROR_nodedown"
2443 running = bool(live_data.get(instance.name))
2445 if instance.status != "down":
2450 if instance.status != "down":
2454 elif field == "admin_ram":
2455 val = instance.memory
2456 elif field == "oper_ram":
2457 if instance.primary_node in bad_nodes:
2459 elif instance.name in live_data:
2460 val = live_data[instance.name].get("memory", "?")
2463 elif field == "disk_template":
2464 val = instance.disk_template
2466 val = instance.nics[0].ip
2467 elif field == "bridge":
2468 val = instance.nics[0].bridge
2469 elif field == "mac":
2470 val = instance.nics[0].mac
2471 elif field == "sda_size" or field == "sdb_size":
2472 disk = instance.FindDisk(field[:3])
2477 elif field == "vcpus":
2478 val = instance.vcpus
2479 elif field == "tags":
2480 val = list(instance.GetTags())
2482 raise errors.ParameterError(field)
2489 class LUFailoverInstance(LogicalUnit):
2490 """Failover an instance.
2493 HPATH = "instance-failover"
2494 HTYPE = constants.HTYPE_INSTANCE
2495 _OP_REQP = ["instance_name", "ignore_consistency"]
2497 def BuildHooksEnv(self):
2500 This runs on master, primary and secondary nodes of the instance.
2504 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2506 env.update(_BuildInstanceHookEnvByObject(self.instance))
2507 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2510 def CheckPrereq(self):
2511 """Check prerequisites.
2513 This checks that the instance is in the cluster.
2516 instance = self.cfg.GetInstanceInfo(
2517 self.cfg.ExpandInstanceName(self.op.instance_name))
2518 if instance is None:
2519 raise errors.OpPrereqError("Instance '%s' not known" %
2520 self.op.instance_name)
2522 if instance.disk_template not in constants.DTS_NET_MIRROR:
2523 raise errors.OpPrereqError("Instance's disk layout is not"
2524 " network mirrored, cannot failover.")
2526 secondary_nodes = instance.secondary_nodes
2527 if not secondary_nodes:
2528 raise errors.ProgrammerError("no secondary node but using "
2529 "a mirrored disk template")
2531 target_node = secondary_nodes[0]
2532 # check memory requirements on the secondary node
2533 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2534 instance.name, instance.memory)
2536 # check bridge existance
2537 brlist = [nic.bridge for nic in instance.nics]
2538 if not rpc.call_bridges_exist(target_node, brlist):
2539 raise errors.OpPrereqError("One or more target bridges %s does not"
2540 " exist on destination node '%s'" %
2541 (brlist, target_node))
2543 self.instance = instance
2545 def Exec(self, feedback_fn):
2546 """Failover an instance.
2548 The failover is done by shutting it down on its present node and
2549 starting it on the secondary.
2552 instance = self.instance
2554 source_node = instance.primary_node
2555 target_node = instance.secondary_nodes[0]
2557 feedback_fn("* checking disk consistency between source and target")
2558 for dev in instance.disks:
2559 # for drbd, these are drbd over lvm
2560 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2561 if instance.status == "up" and not self.op.ignore_consistency:
2562 raise errors.OpExecError("Disk %s is degraded on target node,"
2563 " aborting failover." % dev.iv_name)
2565 feedback_fn("* shutting down instance on source node")
2566 logger.Info("Shutting down instance %s on node %s" %
2567 (instance.name, source_node))
2569 if not rpc.call_instance_shutdown(source_node, instance):
2570 if self.op.ignore_consistency:
2571 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2572 " anyway. Please make sure node %s is down" %
2573 (instance.name, source_node, source_node))
2575 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2576 (instance.name, source_node))
2578 feedback_fn("* deactivating the instance's disks on source node")
2579 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2580 raise errors.OpExecError("Can't shut down the instance's disks.")
2582 instance.primary_node = target_node
2583 # distribute new instance config to the other nodes
2584 self.cfg.Update(instance)
2586 # Only start the instance if it's marked as up
2587 if instance.status == "up":
2588 feedback_fn("* activating the instance's disks on target node")
2589 logger.Info("Starting instance %s on node %s" %
2590 (instance.name, target_node))
2592 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2593 ignore_secondaries=True)
2595 _ShutdownInstanceDisks(instance, self.cfg)
2596 raise errors.OpExecError("Can't activate the instance's disks")
2598 feedback_fn("* starting the instance on the target node")
2599 if not rpc.call_instance_start(target_node, instance, None):
2600 _ShutdownInstanceDisks(instance, self.cfg)
2601 raise errors.OpExecError("Could not start instance %s on node %s." %
2602 (instance.name, target_node))
2605 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2606 """Create a tree of block devices on the primary node.
2608 This always creates all devices.
2612 for child in device.children:
2613 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2616 cfg.SetDiskID(device, node)
2617 new_id = rpc.call_blockdev_create(node, device, device.size,
2618 instance.name, True, info)
2621 if device.physical_id is None:
2622 device.physical_id = new_id
2626 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2627 """Create a tree of block devices on a secondary node.
2629 If this device type has to be created on secondaries, create it and
2632 If not, just recurse to children keeping the same 'force' value.
2635 if device.CreateOnSecondary():
2638 for child in device.children:
2639 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2640 child, force, info):
2645 cfg.SetDiskID(device, node)
2646 new_id = rpc.call_blockdev_create(node, device, device.size,
2647 instance.name, False, info)
2650 if device.physical_id is None:
2651 device.physical_id = new_id
2655 def _GenerateUniqueNames(cfg, exts):
2656 """Generate a suitable LV name.
2658 This will generate a logical volume name for the given instance.
2663 new_id = cfg.GenerateUniqueID()
2664 results.append("%s%s" % (new_id, val))
2668 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2669 """Generate a drbd8 device complete with its children.
2672 port = cfg.AllocatePort()
2673 vgname = cfg.GetVGName()
2674 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2675 logical_id=(vgname, names[0]))
2676 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2677 logical_id=(vgname, names[1]))
2678 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2679 logical_id = (primary, secondary, port),
2680 children = [dev_data, dev_meta],
2685 def _GenerateDiskTemplate(cfg, template_name,
2686 instance_name, primary_node,
2687 secondary_nodes, disk_sz, swap_sz,
2688 file_storage_dir, file_driver):
2689 """Generate the entire disk layout for a given template type.
2692 #TODO: compute space requirements
2694 vgname = cfg.GetVGName()
2695 if template_name == constants.DT_DISKLESS:
2697 elif template_name == constants.DT_PLAIN:
2698 if len(secondary_nodes) != 0:
2699 raise errors.ProgrammerError("Wrong template configuration")
2701 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2702 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2703 logical_id=(vgname, names[0]),
2705 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2706 logical_id=(vgname, names[1]),
2708 disks = [sda_dev, sdb_dev]
2709 elif template_name == constants.DT_DRBD8:
2710 if len(secondary_nodes) != 1:
2711 raise errors.ProgrammerError("Wrong template configuration")
2712 remote_node = secondary_nodes[0]
2713 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2714 ".sdb_data", ".sdb_meta"])
2715 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2716 disk_sz, names[0:2], "sda")
2717 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2718 swap_sz, names[2:4], "sdb")
2719 disks = [drbd_sda_dev, drbd_sdb_dev]
2720 elif template_name == constants.DT_FILE:
2721 if len(secondary_nodes) != 0:
2722 raise errors.ProgrammerError("Wrong template configuration")
2724 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2725 iv_name="sda", logical_id=(file_driver,
2726 "%s/sda" % file_storage_dir))
2727 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2728 iv_name="sdb", logical_id=(file_driver,
2729 "%s/sdb" % file_storage_dir))
2730 disks = [file_sda_dev, file_sdb_dev]
2732 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2736 def _GetInstanceInfoText(instance):
2737 """Compute that text that should be added to the disk's metadata.
2740 return "originstname+%s" % instance.name
2743 def _CreateDisks(cfg, instance):
2744 """Create all disks for an instance.
2746 This abstracts away some work from AddInstance.
2749 instance: the instance object
2752 True or False showing the success of the creation process
2755 info = _GetInstanceInfoText(instance)
2757 if instance.disk_template == constants.DT_FILE:
2758 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2759 result = rpc.call_file_storage_dir_create(instance.primary_node,
2763 logger.Error("Could not connect to node '%s'" % instance.primary_node)
2767 logger.Error("failed to create directory '%s'" % file_storage_dir)
2770 for device in instance.disks:
2771 logger.Info("creating volume %s for instance %s" %
2772 (device.iv_name, instance.name))
2774 for secondary_node in instance.secondary_nodes:
2775 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2776 device, False, info):
2777 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2778 (device.iv_name, device, secondary_node))
2781 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2782 instance, device, info):
2783 logger.Error("failed to create volume %s on primary!" %
2790 def _RemoveDisks(instance, cfg):
2791 """Remove all disks for an instance.
2793 This abstracts away some work from `AddInstance()` and
2794 `RemoveInstance()`. Note that in case some of the devices couldn't
2795 be removed, the removal will continue with the other ones (compare
2796 with `_CreateDisks()`).
2799 instance: the instance object
2802 True or False showing the success of the removal proces
2805 logger.Info("removing block devices for instance %s" % instance.name)
2808 for device in instance.disks:
2809 for node, disk in device.ComputeNodeTree(instance.primary_node):
2810 cfg.SetDiskID(disk, node)
2811 if not rpc.call_blockdev_remove(node, disk):
2812 logger.Error("could not remove block device %s on node %s,"
2813 " continuing anyway" %
2814 (device.iv_name, node))
2817 if instance.disk_template == constants.DT_FILE:
2818 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2819 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2821 logger.Error("could not remove directory '%s'" % file_storage_dir)
2827 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2828 """Compute disk size requirements in the volume group
2830 This is currently hard-coded for the two-drive layout.
2833 # Required free disk space as a function of disk and swap space
2835 constants.DT_DISKLESS: None,
2836 constants.DT_PLAIN: disk_size + swap_size,
2837 # 256 MB are added for drbd metadata, 128MB for each drbd device
2838 constants.DT_DRBD8: disk_size + swap_size + 256,
2839 constants.DT_FILE: None,
2842 if disk_template not in req_size_dict:
2843 raise errors.ProgrammerError("Disk template '%s' size requirement"
2844 " is unknown" % disk_template)
2846 return req_size_dict[disk_template]
2849 class LUCreateInstance(LogicalUnit):
2850 """Create an instance.
2853 HPATH = "instance-add"
2854 HTYPE = constants.HTYPE_INSTANCE
2855 _OP_REQP = ["instance_name", "mem_size", "disk_size",
2856 "disk_template", "swap_size", "mode", "start", "vcpus",
2857 "wait_for_sync", "ip_check", "mac"]
2859 def _RunAllocator(self):
2860 """Run the allocator based on input opcode.
2863 disks = [{"size": self.op.disk_size, "mode": "w"},
2864 {"size": self.op.swap_size, "mode": "w"}]
2865 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2866 "bridge": self.op.bridge}]
2867 ial = IAllocator(self.cfg, self.sstore,
2868 mode=constants.IALLOCATOR_MODE_ALLOC,
2869 name=self.op.instance_name,
2870 disk_template=self.op.disk_template,
2873 vcpus=self.op.vcpus,
2874 mem_size=self.op.mem_size,
2879 ial.Run(self.op.iallocator)
2882 raise errors.OpPrereqError("Can't compute nodes using"
2883 " iallocator '%s': %s" % (self.op.iallocator,
2885 if len(ial.nodes) != ial.required_nodes:
2886 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2887 " of nodes (%s), required %s" %
2888 (len(ial.nodes), ial.required_nodes))
2889 self.op.pnode = ial.nodes[0]
2890 logger.ToStdout("Selected nodes for the instance: %s" %
2891 (", ".join(ial.nodes),))
2892 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2893 (self.op.instance_name, self.op.iallocator, ial.nodes))
2894 if ial.required_nodes == 2:
2895 self.op.snode = ial.nodes[1]
2897 def BuildHooksEnv(self):
2900 This runs on master, primary and secondary nodes of the instance.
2904 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2905 "INSTANCE_DISK_SIZE": self.op.disk_size,
2906 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2907 "INSTANCE_ADD_MODE": self.op.mode,
2909 if self.op.mode == constants.INSTANCE_IMPORT:
2910 env["INSTANCE_SRC_NODE"] = self.op.src_node
2911 env["INSTANCE_SRC_PATH"] = self.op.src_path
2912 env["INSTANCE_SRC_IMAGE"] = self.src_image
2914 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2915 primary_node=self.op.pnode,
2916 secondary_nodes=self.secondaries,
2917 status=self.instance_status,
2918 os_type=self.op.os_type,
2919 memory=self.op.mem_size,
2920 vcpus=self.op.vcpus,
2921 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2924 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2929 def CheckPrereq(self):
2930 """Check prerequisites.
2933 # set optional parameters to none if they don't exist
2934 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
2935 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
2936 "vnc_bind_address"]:
2937 if not hasattr(self.op, attr):
2938 setattr(self.op, attr, None)
2940 if self.op.mode not in (constants.INSTANCE_CREATE,
2941 constants.INSTANCE_IMPORT):
2942 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
2945 if (not self.cfg.GetVGName() and
2946 self.op.disk_template not in constants.DTS_NOT_LVM):
2947 raise errors.OpPrereqError("Cluster does not support lvm-based"
2950 if self.op.mode == constants.INSTANCE_IMPORT:
2951 src_node = getattr(self.op, "src_node", None)
2952 src_path = getattr(self.op, "src_path", None)
2953 if src_node is None or src_path is None:
2954 raise errors.OpPrereqError("Importing an instance requires source"
2955 " node and path options")
2956 src_node_full = self.cfg.ExpandNodeName(src_node)
2957 if src_node_full is None:
2958 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
2959 self.op.src_node = src_node = src_node_full
2961 if not os.path.isabs(src_path):
2962 raise errors.OpPrereqError("The source path must be absolute")
2964 export_info = rpc.call_export_info(src_node, src_path)
2967 raise errors.OpPrereqError("No export found in dir %s" % src_path)
2969 if not export_info.has_section(constants.INISECT_EXP):
2970 raise errors.ProgrammerError("Corrupted export config")
2972 ei_version = export_info.get(constants.INISECT_EXP, 'version')
2973 if (int(ei_version) != constants.EXPORT_VERSION):
2974 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
2975 (ei_version, constants.EXPORT_VERSION))
2977 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
2978 raise errors.OpPrereqError("Can't import instance with more than"
2981 # FIXME: are the old os-es, disk sizes, etc. useful?
2982 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
2983 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
2985 self.src_image = diskimage
2986 else: # INSTANCE_CREATE
2987 if getattr(self.op, "os_type", None) is None:
2988 raise errors.OpPrereqError("No guest OS specified")
2990 #### instance parameters check
2992 # disk template and mirror node verification
2993 if self.op.disk_template not in constants.DISK_TEMPLATES:
2994 raise errors.OpPrereqError("Invalid disk template name")
2996 # instance name verification
2997 hostname1 = utils.HostInfo(self.op.instance_name)
2999 self.op.instance_name = instance_name = hostname1.name
3000 instance_list = self.cfg.GetInstanceList()
3001 if instance_name in instance_list:
3002 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3005 # ip validity checks
3006 ip = getattr(self.op, "ip", None)
3007 if ip is None or ip.lower() == "none":
3009 elif ip.lower() == "auto":
3010 inst_ip = hostname1.ip
3012 if not utils.IsValidIP(ip):
3013 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3014 " like a valid IP" % ip)
3016 self.inst_ip = self.op.ip = inst_ip
3018 if self.op.start and not self.op.ip_check:
3019 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3020 " adding an instance in start mode")
3022 if self.op.ip_check:
3023 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3024 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3025 (hostname1.ip, instance_name))
3027 # MAC address verification
3028 if self.op.mac != "auto":
3029 if not utils.IsValidMac(self.op.mac.lower()):
3030 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3033 # bridge verification
3034 bridge = getattr(self.op, "bridge", None)
3036 self.op.bridge = self.cfg.GetDefBridge()
3038 self.op.bridge = bridge
3040 # boot order verification
3041 if self.op.hvm_boot_order is not None:
3042 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3043 raise errors.OpPrereqError("invalid boot order specified,"
3044 " must be one or more of [acdn]")
3045 # file storage checks
3046 if (self.op.file_driver and
3047 not self.op.file_driver in constants.FILE_DRIVER):
3048 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3049 self.op.file_driver)
3051 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3052 raise errors.OpPrereqError("File storage directory not a relative"
3056 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3057 raise errors.OpPrereqError("One and only one of iallocator and primary"
3058 " node must be given")
3060 if self.op.iallocator is not None:
3061 self._RunAllocator()
3063 #### node related checks
3065 # check primary node
3066 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3068 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3070 self.op.pnode = pnode.name
3072 self.secondaries = []
3074 # mirror node verification
3075 if self.op.disk_template in constants.DTS_NET_MIRROR:
3076 if getattr(self.op, "snode", None) is None:
3077 raise errors.OpPrereqError("The networked disk templates need"
3080 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3081 if snode_name is None:
3082 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3084 elif snode_name == pnode.name:
3085 raise errors.OpPrereqError("The secondary node cannot be"
3086 " the primary node.")
3087 self.secondaries.append(snode_name)
3089 req_size = _ComputeDiskSize(self.op.disk_template,
3090 self.op.disk_size, self.op.swap_size)
3092 # Check lv size requirements
3093 if req_size is not None:
3094 nodenames = [pnode.name] + self.secondaries
3095 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3096 for node in nodenames:
3097 info = nodeinfo.get(node, None)
3099 raise errors.OpPrereqError("Cannot get current information"
3100 " from node '%s'" % node)
3101 vg_free = info.get('vg_free', None)
3102 if not isinstance(vg_free, int):
3103 raise errors.OpPrereqError("Can't compute free disk space on"
3105 if req_size > info['vg_free']:
3106 raise errors.OpPrereqError("Not enough disk space on target node %s."
3107 " %d MB available, %d MB required" %
3108 (node, info['vg_free'], req_size))
3111 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3113 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3114 " primary node" % self.op.os_type)
3116 if self.op.kernel_path == constants.VALUE_NONE:
3117 raise errors.OpPrereqError("Can't set instance kernel to none")
3120 # bridge check on primary node
3121 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3122 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3123 " destination node '%s'" %
3124 (self.op.bridge, pnode.name))
3126 # memory check on primary node
3128 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3129 "creating instance %s" % self.op.instance_name,
3132 # hvm_cdrom_image_path verification
3133 if self.op.hvm_cdrom_image_path is not None:
3134 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3135 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3136 " be an absolute path or None, not %s" %
3137 self.op.hvm_cdrom_image_path)
3138 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3139 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3140 " regular file or a symlink pointing to"
3141 " an existing regular file, not %s" %
3142 self.op.hvm_cdrom_image_path)
3144 # vnc_bind_address verification
3145 if self.op.vnc_bind_address is not None:
3146 if not utils.IsValidIP(self.op.vnc_bind_address):
3147 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3148 " like a valid IP address" %
3149 self.op.vnc_bind_address)
3152 self.instance_status = 'up'
3154 self.instance_status = 'down'
3156 def Exec(self, feedback_fn):
3157 """Create and add the instance to the cluster.
3160 instance = self.op.instance_name
3161 pnode_name = self.pnode.name
3163 if self.op.mac == "auto":
3164 mac_address = self.cfg.GenerateMAC()
3166 mac_address = self.op.mac
3168 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3169 if self.inst_ip is not None:
3170 nic.ip = self.inst_ip
3172 ht_kind = self.sstore.GetHypervisorType()
3173 if ht_kind in constants.HTS_REQ_PORT:
3174 network_port = self.cfg.AllocatePort()
3178 if self.op.vnc_bind_address is None:
3179 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3181 # this is needed because os.path.join does not accept None arguments
3182 if self.op.file_storage_dir is None:
3183 string_file_storage_dir = ""
3185 string_file_storage_dir = self.op.file_storage_dir
3187 # build the full file storage dir path
3188 file_storage_dir = os.path.normpath(os.path.join(
3189 self.sstore.GetFileStorageDir(),
3190 string_file_storage_dir, instance))
3193 disks = _GenerateDiskTemplate(self.cfg,
3194 self.op.disk_template,
3195 instance, pnode_name,
3196 self.secondaries, self.op.disk_size,
3199 self.op.file_driver)
3201 iobj = objects.Instance(name=instance, os=self.op.os_type,
3202 primary_node=pnode_name,
3203 memory=self.op.mem_size,
3204 vcpus=self.op.vcpus,
3205 nics=[nic], disks=disks,
3206 disk_template=self.op.disk_template,
3207 status=self.instance_status,
3208 network_port=network_port,
3209 kernel_path=self.op.kernel_path,
3210 initrd_path=self.op.initrd_path,
3211 hvm_boot_order=self.op.hvm_boot_order,
3212 hvm_acpi=self.op.hvm_acpi,
3213 hvm_pae=self.op.hvm_pae,
3214 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3215 vnc_bind_address=self.op.vnc_bind_address,
3218 feedback_fn("* creating instance disks...")
3219 if not _CreateDisks(self.cfg, iobj):
3220 _RemoveDisks(iobj, self.cfg)
3221 raise errors.OpExecError("Device creation failed, reverting...")
3223 feedback_fn("adding instance %s to cluster config" % instance)
3225 self.cfg.AddInstance(iobj)
3227 if self.op.wait_for_sync:
3228 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3229 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3230 # make sure the disks are not degraded (still sync-ing is ok)
3232 feedback_fn("* checking mirrors status")
3233 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3238 _RemoveDisks(iobj, self.cfg)
3239 self.cfg.RemoveInstance(iobj.name)
3240 raise errors.OpExecError("There are some degraded disks for"
3243 feedback_fn("creating os for instance %s on node %s" %
3244 (instance, pnode_name))
3246 if iobj.disk_template != constants.DT_DISKLESS:
3247 if self.op.mode == constants.INSTANCE_CREATE:
3248 feedback_fn("* running the instance OS create scripts...")
3249 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3250 raise errors.OpExecError("could not add os for instance %s"
3252 (instance, pnode_name))
3254 elif self.op.mode == constants.INSTANCE_IMPORT:
3255 feedback_fn("* running the instance OS import scripts...")
3256 src_node = self.op.src_node
3257 src_image = self.src_image
3258 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3259 src_node, src_image):
3260 raise errors.OpExecError("Could not import os for instance"
3262 (instance, pnode_name))
3264 # also checked in the prereq part
3265 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3269 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3270 feedback_fn("* starting instance...")
3271 if not rpc.call_instance_start(pnode_name, iobj, None):
3272 raise errors.OpExecError("Could not start instance")
3275 class LUConnectConsole(NoHooksLU):
3276 """Connect to an instance's console.
3278 This is somewhat special in that it returns the command line that
3279 you need to run on the master node in order to connect to the
3283 _OP_REQP = ["instance_name"]
3285 def CheckPrereq(self):
3286 """Check prerequisites.
3288 This checks that the instance is in the cluster.
3291 instance = self.cfg.GetInstanceInfo(
3292 self.cfg.ExpandInstanceName(self.op.instance_name))
3293 if instance is None:
3294 raise errors.OpPrereqError("Instance '%s' not known" %
3295 self.op.instance_name)
3296 self.instance = instance
3298 def Exec(self, feedback_fn):
3299 """Connect to the console of an instance
3302 instance = self.instance
3303 node = instance.primary_node
3305 node_insts = rpc.call_instance_list([node])[node]
3306 if node_insts is False:
3307 raise errors.OpExecError("Can't connect to node %s." % node)
3309 if instance.name not in node_insts:
3310 raise errors.OpExecError("Instance %s is not running." % instance.name)
3312 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3314 hyper = hypervisor.GetHypervisor()
3315 console_cmd = hyper.GetShellCommandForConsole(instance)
3318 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3321 class LUReplaceDisks(LogicalUnit):
3322 """Replace the disks of an instance.
3325 HPATH = "mirrors-replace"
3326 HTYPE = constants.HTYPE_INSTANCE
3327 _OP_REQP = ["instance_name", "mode", "disks"]
3329 def _RunAllocator(self):
3330 """Compute a new secondary node using an IAllocator.
3333 ial = IAllocator(self.cfg, self.sstore,
3334 mode=constants.IALLOCATOR_MODE_RELOC,
3335 name=self.op.instance_name,
3336 relocate_from=[self.sec_node])
3338 ial.Run(self.op.iallocator)
3341 raise errors.OpPrereqError("Can't compute nodes using"
3342 " iallocator '%s': %s" % (self.op.iallocator,
3344 if len(ial.nodes) != ial.required_nodes:
3345 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3346 " of nodes (%s), required %s" %
3347 (len(ial.nodes), ial.required_nodes))
3348 self.op.remote_node = ial.nodes[0]
3349 logger.ToStdout("Selected new secondary for the instance: %s" %
3350 self.op.remote_node)
3352 def BuildHooksEnv(self):
3355 This runs on the master, the primary and all the secondaries.
3359 "MODE": self.op.mode,
3360 "NEW_SECONDARY": self.op.remote_node,
3361 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3363 env.update(_BuildInstanceHookEnvByObject(self.instance))
3365 self.sstore.GetMasterNode(),
3366 self.instance.primary_node,
3368 if self.op.remote_node is not None:
3369 nl.append(self.op.remote_node)
3372 def CheckPrereq(self):
3373 """Check prerequisites.
3375 This checks that the instance is in the cluster.
3378 if not hasattr(self.op, "remote_node"):
3379 self.op.remote_node = None
3381 instance = self.cfg.GetInstanceInfo(
3382 self.cfg.ExpandInstanceName(self.op.instance_name))
3383 if instance is None:
3384 raise errors.OpPrereqError("Instance '%s' not known" %
3385 self.op.instance_name)
3386 self.instance = instance
3387 self.op.instance_name = instance.name
3389 if instance.disk_template not in constants.DTS_NET_MIRROR:
3390 raise errors.OpPrereqError("Instance's disk layout is not"
3391 " network mirrored.")
3393 if len(instance.secondary_nodes) != 1:
3394 raise errors.OpPrereqError("The instance has a strange layout,"
3395 " expected one secondary but found %d" %
3396 len(instance.secondary_nodes))
3398 self.sec_node = instance.secondary_nodes[0]
3400 ia_name = getattr(self.op, "iallocator", None)
3401 if ia_name is not None:
3402 if self.op.remote_node is not None:
3403 raise errors.OpPrereqError("Give either the iallocator or the new"
3404 " secondary, not both")
3405 self.op.remote_node = self._RunAllocator()
3407 remote_node = self.op.remote_node
3408 if remote_node is not None:
3409 remote_node = self.cfg.ExpandNodeName(remote_node)
3410 if remote_node is None:
3411 raise errors.OpPrereqError("Node '%s' not known" %
3412 self.op.remote_node)
3413 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3415 self.remote_node_info = None
3416 if remote_node == instance.primary_node:
3417 raise errors.OpPrereqError("The specified node is the primary node of"
3419 elif remote_node == self.sec_node:
3420 if self.op.mode == constants.REPLACE_DISK_SEC:
3421 # this is for DRBD8, where we can't execute the same mode of
3422 # replacement as for drbd7 (no different port allocated)
3423 raise errors.OpPrereqError("Same secondary given, cannot execute"
3425 if instance.disk_template == constants.DT_DRBD8:
3426 if (self.op.mode == constants.REPLACE_DISK_ALL and
3427 remote_node is not None):
3428 # switch to replace secondary mode
3429 self.op.mode = constants.REPLACE_DISK_SEC
3431 if self.op.mode == constants.REPLACE_DISK_ALL:
3432 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3433 " secondary disk replacement, not"
3435 elif self.op.mode == constants.REPLACE_DISK_PRI:
3436 if remote_node is not None:
3437 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3438 " the secondary while doing a primary"
3439 " node disk replacement")
3440 self.tgt_node = instance.primary_node
3441 self.oth_node = instance.secondary_nodes[0]
3442 elif self.op.mode == constants.REPLACE_DISK_SEC:
3443 self.new_node = remote_node # this can be None, in which case
3444 # we don't change the secondary
3445 self.tgt_node = instance.secondary_nodes[0]
3446 self.oth_node = instance.primary_node
3448 raise errors.ProgrammerError("Unhandled disk replace mode")
3450 for name in self.op.disks:
3451 if instance.FindDisk(name) is None:
3452 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3453 (name, instance.name))
3454 self.op.remote_node = remote_node
3456 def _ExecD8DiskOnly(self, feedback_fn):
3457 """Replace a disk on the primary or secondary for dbrd8.
3459 The algorithm for replace is quite complicated:
3460 - for each disk to be replaced:
3461 - create new LVs on the target node with unique names
3462 - detach old LVs from the drbd device
3463 - rename old LVs to name_replaced.<time_t>
3464 - rename new LVs to old LVs
3465 - attach the new LVs (with the old names now) to the drbd device
3466 - wait for sync across all devices
3467 - for each modified disk:
3468 - remove old LVs (which have the name name_replaces.<time_t>)
3470 Failures are not very well handled.
3474 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3475 instance = self.instance
3477 vgname = self.cfg.GetVGName()
3480 tgt_node = self.tgt_node
3481 oth_node = self.oth_node
3483 # Step: check device activation
3484 self.proc.LogStep(1, steps_total, "check device existence")
3485 info("checking volume groups")
3486 my_vg = cfg.GetVGName()
3487 results = rpc.call_vg_list([oth_node, tgt_node])
3489 raise errors.OpExecError("Can't list volume groups on the nodes")
3490 for node in oth_node, tgt_node:
3491 res = results.get(node, False)
3492 if not res or my_vg not in res:
3493 raise errors.OpExecError("Volume group '%s' not found on %s" %
3495 for dev in instance.disks:
3496 if not dev.iv_name in self.op.disks:
3498 for node in tgt_node, oth_node:
3499 info("checking %s on %s" % (dev.iv_name, node))
3500 cfg.SetDiskID(dev, node)
3501 if not rpc.call_blockdev_find(node, dev):
3502 raise errors.OpExecError("Can't find device %s on node %s" %
3503 (dev.iv_name, node))
3505 # Step: check other node consistency
3506 self.proc.LogStep(2, steps_total, "check peer consistency")
3507 for dev in instance.disks:
3508 if not dev.iv_name in self.op.disks:
3510 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3511 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3512 oth_node==instance.primary_node):
3513 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3514 " to replace disks on this node (%s)" %
3515 (oth_node, tgt_node))
3517 # Step: create new storage
3518 self.proc.LogStep(3, steps_total, "allocate new storage")
3519 for dev in instance.disks:
3520 if not dev.iv_name in self.op.disks:
3523 cfg.SetDiskID(dev, tgt_node)
3524 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3525 names = _GenerateUniqueNames(cfg, lv_names)
3526 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3527 logical_id=(vgname, names[0]))
3528 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3529 logical_id=(vgname, names[1]))
3530 new_lvs = [lv_data, lv_meta]
3531 old_lvs = dev.children
3532 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3533 info("creating new local storage on %s for %s" %
3534 (tgt_node, dev.iv_name))
3535 # since we *always* want to create this LV, we use the
3536 # _Create...OnPrimary (which forces the creation), even if we
3537 # are talking about the secondary node
3538 for new_lv in new_lvs:
3539 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3540 _GetInstanceInfoText(instance)):
3541 raise errors.OpExecError("Failed to create new LV named '%s' on"
3543 (new_lv.logical_id[1], tgt_node))
3545 # Step: for each lv, detach+rename*2+attach
3546 self.proc.LogStep(4, steps_total, "change drbd configuration")
3547 for dev, old_lvs, new_lvs in iv_names.itervalues():
3548 info("detaching %s drbd from local storage" % dev.iv_name)
3549 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3550 raise errors.OpExecError("Can't detach drbd from local storage on node"
3551 " %s for device %s" % (tgt_node, dev.iv_name))
3553 #cfg.Update(instance)
3555 # ok, we created the new LVs, so now we know we have the needed
3556 # storage; as such, we proceed on the target node to rename
3557 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3558 # using the assumption that logical_id == physical_id (which in
3559 # turn is the unique_id on that node)
3561 # FIXME(iustin): use a better name for the replaced LVs
3562 temp_suffix = int(time.time())
3563 ren_fn = lambda d, suff: (d.physical_id[0],
3564 d.physical_id[1] + "_replaced-%s" % suff)
3565 # build the rename list based on what LVs exist on the node
3567 for to_ren in old_lvs:
3568 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3569 if find_res is not None: # device exists
3570 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3572 info("renaming the old LVs on the target node")
3573 if not rpc.call_blockdev_rename(tgt_node, rlist):
3574 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3575 # now we rename the new LVs to the old LVs
3576 info("renaming the new LVs on the target node")
3577 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3578 if not rpc.call_blockdev_rename(tgt_node, rlist):
3579 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3581 for old, new in zip(old_lvs, new_lvs):
3582 new.logical_id = old.logical_id
3583 cfg.SetDiskID(new, tgt_node)
3585 for disk in old_lvs:
3586 disk.logical_id = ren_fn(disk, temp_suffix)
3587 cfg.SetDiskID(disk, tgt_node)
3589 # now that the new lvs have the old name, we can add them to the device
3590 info("adding new mirror component on %s" % tgt_node)
3591 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3592 for new_lv in new_lvs:
3593 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3594 warning("Can't rollback device %s", hint="manually cleanup unused"
3596 raise errors.OpExecError("Can't add local storage to drbd")
3598 dev.children = new_lvs
3599 cfg.Update(instance)
3601 # Step: wait for sync
3603 # this can fail as the old devices are degraded and _WaitForSync
3604 # does a combined result over all disks, so we don't check its
3606 self.proc.LogStep(5, steps_total, "sync devices")
3607 _WaitForSync(cfg, instance, self.proc, unlock=True)
3609 # so check manually all the devices
3610 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3611 cfg.SetDiskID(dev, instance.primary_node)
3612 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3614 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3616 # Step: remove old storage
3617 self.proc.LogStep(6, steps_total, "removing old storage")
3618 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3619 info("remove logical volumes for %s" % name)
3621 cfg.SetDiskID(lv, tgt_node)
3622 if not rpc.call_blockdev_remove(tgt_node, lv):
3623 warning("Can't remove old LV", hint="manually remove unused LVs")
3626 def _ExecD8Secondary(self, feedback_fn):
3627 """Replace the secondary node for drbd8.
3629 The algorithm for replace is quite complicated:
3630 - for all disks of the instance:
3631 - create new LVs on the new node with same names
3632 - shutdown the drbd device on the old secondary
3633 - disconnect the drbd network on the primary
3634 - create the drbd device on the new secondary
3635 - network attach the drbd on the primary, using an artifice:
3636 the drbd code for Attach() will connect to the network if it
3637 finds a device which is connected to the good local disks but
3639 - wait for sync across all devices
3640 - remove all disks from the old secondary
3642 Failures are not very well handled.
3646 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3647 instance = self.instance
3649 vgname = self.cfg.GetVGName()
3652 old_node = self.tgt_node
3653 new_node = self.new_node
3654 pri_node = instance.primary_node
3656 # Step: check device activation
3657 self.proc.LogStep(1, steps_total, "check device existence")
3658 info("checking volume groups")
3659 my_vg = cfg.GetVGName()
3660 results = rpc.call_vg_list([pri_node, new_node])
3662 raise errors.OpExecError("Can't list volume groups on the nodes")
3663 for node in pri_node, new_node:
3664 res = results.get(node, False)
3665 if not res or my_vg not in res:
3666 raise errors.OpExecError("Volume group '%s' not found on %s" %
3668 for dev in instance.disks:
3669 if not dev.iv_name in self.op.disks:
3671 info("checking %s on %s" % (dev.iv_name, pri_node))
3672 cfg.SetDiskID(dev, pri_node)
3673 if not rpc.call_blockdev_find(pri_node, dev):
3674 raise errors.OpExecError("Can't find device %s on node %s" %
3675 (dev.iv_name, pri_node))
3677 # Step: check other node consistency
3678 self.proc.LogStep(2, steps_total, "check peer consistency")
3679 for dev in instance.disks:
3680 if not dev.iv_name in self.op.disks:
3682 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3683 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3684 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3685 " unsafe to replace the secondary" %
3688 # Step: create new storage
3689 self.proc.LogStep(3, steps_total, "allocate new storage")
3690 for dev in instance.disks:
3692 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3693 # since we *always* want to create this LV, we use the
3694 # _Create...OnPrimary (which forces the creation), even if we
3695 # are talking about the secondary node
3696 for new_lv in dev.children:
3697 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3698 _GetInstanceInfoText(instance)):
3699 raise errors.OpExecError("Failed to create new LV named '%s' on"
3701 (new_lv.logical_id[1], new_node))
3703 iv_names[dev.iv_name] = (dev, dev.children)
3705 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3706 for dev in instance.disks:
3708 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3709 # create new devices on new_node
3710 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3711 logical_id=(pri_node, new_node,
3713 children=dev.children)
3714 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3716 _GetInstanceInfoText(instance)):
3717 raise errors.OpExecError("Failed to create new DRBD on"
3718 " node '%s'" % new_node)
3720 for dev in instance.disks:
3721 # we have new devices, shutdown the drbd on the old secondary
3722 info("shutting down drbd for %s on old node" % dev.iv_name)
3723 cfg.SetDiskID(dev, old_node)
3724 if not rpc.call_blockdev_shutdown(old_node, dev):
3725 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3726 hint="Please cleanup this device manually as soon as possible")
3728 info("detaching primary drbds from the network (=> standalone)")
3730 for dev in instance.disks:
3731 cfg.SetDiskID(dev, pri_node)
3732 # set the physical (unique in bdev terms) id to None, meaning
3733 # detach from network
3734 dev.physical_id = (None,) * len(dev.physical_id)
3735 # and 'find' the device, which will 'fix' it to match the
3737 if rpc.call_blockdev_find(pri_node, dev):
3740 warning("Failed to detach drbd %s from network, unusual case" %
3744 # no detaches succeeded (very unlikely)
3745 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3747 # if we managed to detach at least one, we update all the disks of
3748 # the instance to point to the new secondary
3749 info("updating instance configuration")
3750 for dev in instance.disks:
3751 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3752 cfg.SetDiskID(dev, pri_node)
3753 cfg.Update(instance)
3755 # and now perform the drbd attach
3756 info("attaching primary drbds to new secondary (standalone => connected)")
3758 for dev in instance.disks:
3759 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3760 # since the attach is smart, it's enough to 'find' the device,
3761 # it will automatically activate the network, if the physical_id
3763 cfg.SetDiskID(dev, pri_node)
3764 if not rpc.call_blockdev_find(pri_node, dev):
3765 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3766 "please do a gnt-instance info to see the status of disks")
3768 # this can fail as the old devices are degraded and _WaitForSync
3769 # does a combined result over all disks, so we don't check its
3771 self.proc.LogStep(5, steps_total, "sync devices")
3772 _WaitForSync(cfg, instance, self.proc, unlock=True)
3774 # so check manually all the devices
3775 for name, (dev, old_lvs) in iv_names.iteritems():
3776 cfg.SetDiskID(dev, pri_node)
3777 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3779 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3781 self.proc.LogStep(6, steps_total, "removing old storage")
3782 for name, (dev, old_lvs) in iv_names.iteritems():
3783 info("remove logical volumes for %s" % name)
3785 cfg.SetDiskID(lv, old_node)
3786 if not rpc.call_blockdev_remove(old_node, lv):
3787 warning("Can't remove LV on old secondary",
3788 hint="Cleanup stale volumes by hand")
3790 def Exec(self, feedback_fn):
3791 """Execute disk replacement.
3793 This dispatches the disk replacement to the appropriate handler.
3796 instance = self.instance
3798 # Activate the instance disks if we're replacing them on a down instance
3799 if instance.status == "down":
3800 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3801 self.proc.ChainOpCode(op)
3803 if instance.disk_template == constants.DT_DRBD8:
3804 if self.op.remote_node is None:
3805 fn = self._ExecD8DiskOnly
3807 fn = self._ExecD8Secondary
3809 raise errors.ProgrammerError("Unhandled disk replacement case")
3811 ret = fn(feedback_fn)
3813 # Deactivate the instance disks if we're replacing them on a down instance
3814 if instance.status == "down":
3815 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3816 self.proc.ChainOpCode(op)
3821 class LUGrowDisk(LogicalUnit):
3822 """Grow a disk of an instance.
3826 HTYPE = constants.HTYPE_INSTANCE
3827 _OP_REQP = ["instance_name", "disk", "amount"]
3829 def BuildHooksEnv(self):
3832 This runs on the master, the primary and all the secondaries.
3836 "DISK": self.op.disk,
3837 "AMOUNT": self.op.amount,
3839 env.update(_BuildInstanceHookEnvByObject(self.instance))
3841 self.sstore.GetMasterNode(),
3842 self.instance.primary_node,
3846 def CheckPrereq(self):
3847 """Check prerequisites.
3849 This checks that the instance is in the cluster.
3852 instance = self.cfg.GetInstanceInfo(
3853 self.cfg.ExpandInstanceName(self.op.instance_name))
3854 if instance is None:
3855 raise errors.OpPrereqError("Instance '%s' not known" %
3856 self.op.instance_name)
3857 self.instance = instance
3858 self.op.instance_name = instance.name
3860 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3861 raise errors.OpPrereqError("Instance's disk layout does not support"
3864 if instance.FindDisk(self.op.disk) is None:
3865 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3866 (self.op.disk, instance.name))
3868 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3869 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3870 for node in nodenames:
3871 info = nodeinfo.get(node, None)
3873 raise errors.OpPrereqError("Cannot get current information"
3874 " from node '%s'" % node)
3875 vg_free = info.get('vg_free', None)
3876 if not isinstance(vg_free, int):
3877 raise errors.OpPrereqError("Can't compute free disk space on"
3879 if self.op.amount > info['vg_free']:
3880 raise errors.OpPrereqError("Not enough disk space on target node %s:"
3881 " %d MiB available, %d MiB required" %
3882 (node, info['vg_free'], self.op.amount))
3884 def Exec(self, feedback_fn):
3885 """Execute disk grow.
3888 instance = self.instance
3889 disk = instance.FindDisk(self.op.disk)
3890 for node in (instance.secondary_nodes + (instance.primary_node,)):
3891 self.cfg.SetDiskID(disk, node)
3892 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3893 if not result or not isinstance(result, tuple) or len(result) != 2:
3894 raise errors.OpExecError("grow request failed to node %s" % node)
3896 raise errors.OpExecError("grow request failed to node %s: %s" %
3898 disk.RecordGrow(self.op.amount)
3899 self.cfg.Update(instance)
3903 class LUQueryInstanceData(NoHooksLU):
3904 """Query runtime instance data.
3907 _OP_REQP = ["instances"]
3909 def CheckPrereq(self):
3910 """Check prerequisites.
3912 This only checks the optional instance list against the existing names.
3915 if not isinstance(self.op.instances, list):
3916 raise errors.OpPrereqError("Invalid argument type 'instances'")
3917 if self.op.instances:
3918 self.wanted_instances = []
3919 names = self.op.instances
3921 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3922 if instance is None:
3923 raise errors.OpPrereqError("No such instance name '%s'" % name)
3924 self.wanted_instances.append(instance)
3926 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3927 in self.cfg.GetInstanceList()]
3931 def _ComputeDiskStatus(self, instance, snode, dev):
3932 """Compute block device status.
3935 self.cfg.SetDiskID(dev, instance.primary_node)
3936 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3937 if dev.dev_type in constants.LDS_DRBD:
3938 # we change the snode then (otherwise we use the one passed in)
3939 if dev.logical_id[0] == instance.primary_node:
3940 snode = dev.logical_id[1]
3942 snode = dev.logical_id[0]
3945 self.cfg.SetDiskID(dev, snode)
3946 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3951 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3952 for child in dev.children]
3957 "iv_name": dev.iv_name,
3958 "dev_type": dev.dev_type,
3959 "logical_id": dev.logical_id,
3960 "physical_id": dev.physical_id,
3961 "pstatus": dev_pstatus,
3962 "sstatus": dev_sstatus,
3963 "children": dev_children,
3968 def Exec(self, feedback_fn):
3969 """Gather and return data"""
3971 for instance in self.wanted_instances:
3972 remote_info = rpc.call_instance_info(instance.primary_node,
3974 if remote_info and "state" in remote_info:
3977 remote_state = "down"
3978 if instance.status == "down":
3979 config_state = "down"
3983 disks = [self._ComputeDiskStatus(instance, None, device)
3984 for device in instance.disks]
3987 "name": instance.name,
3988 "config_state": config_state,
3989 "run_state": remote_state,
3990 "pnode": instance.primary_node,
3991 "snodes": instance.secondary_nodes,
3993 "memory": instance.memory,
3994 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
3996 "vcpus": instance.vcpus,
3999 htkind = self.sstore.GetHypervisorType()
4000 if htkind == constants.HT_XEN_PVM30:
4001 idict["kernel_path"] = instance.kernel_path
4002 idict["initrd_path"] = instance.initrd_path
4004 if htkind == constants.HT_XEN_HVM31:
4005 idict["hvm_boot_order"] = instance.hvm_boot_order
4006 idict["hvm_acpi"] = instance.hvm_acpi
4007 idict["hvm_pae"] = instance.hvm_pae
4008 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4010 if htkind in constants.HTS_REQ_PORT:
4011 idict["vnc_bind_address"] = instance.vnc_bind_address
4012 idict["network_port"] = instance.network_port
4014 result[instance.name] = idict
4019 class LUSetInstanceParams(LogicalUnit):
4020 """Modifies an instances's parameters.
4023 HPATH = "instance-modify"
4024 HTYPE = constants.HTYPE_INSTANCE
4025 _OP_REQP = ["instance_name"]
4027 def BuildHooksEnv(self):
4030 This runs on the master, primary and secondaries.
4035 args['memory'] = self.mem
4037 args['vcpus'] = self.vcpus
4038 if self.do_ip or self.do_bridge or self.mac:
4042 ip = self.instance.nics[0].ip
4044 bridge = self.bridge
4046 bridge = self.instance.nics[0].bridge
4050 mac = self.instance.nics[0].mac
4051 args['nics'] = [(ip, bridge, mac)]
4052 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4053 nl = [self.sstore.GetMasterNode(),
4054 self.instance.primary_node] + list(self.instance.secondary_nodes)
4057 def CheckPrereq(self):
4058 """Check prerequisites.
4060 This only checks the instance list against the existing names.
4063 self.mem = getattr(self.op, "mem", None)
4064 self.vcpus = getattr(self.op, "vcpus", None)
4065 self.ip = getattr(self.op, "ip", None)
4066 self.mac = getattr(self.op, "mac", None)
4067 self.bridge = getattr(self.op, "bridge", None)
4068 self.kernel_path = getattr(self.op, "kernel_path", None)
4069 self.initrd_path = getattr(self.op, "initrd_path", None)
4070 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4071 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4072 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4073 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4074 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4075 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4076 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4077 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4078 self.vnc_bind_address]
4079 if all_parms.count(None) == len(all_parms):
4080 raise errors.OpPrereqError("No changes submitted")
4081 if self.mem is not None:
4083 self.mem = int(self.mem)
4084 except ValueError, err:
4085 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4086 if self.vcpus is not None:
4088 self.vcpus = int(self.vcpus)
4089 except ValueError, err:
4090 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4091 if self.ip is not None:
4093 if self.ip.lower() == "none":
4096 if not utils.IsValidIP(self.ip):
4097 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4100 self.do_bridge = (self.bridge is not None)
4101 if self.mac is not None:
4102 if self.cfg.IsMacInUse(self.mac):
4103 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4105 if not utils.IsValidMac(self.mac):
4106 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4108 if self.kernel_path is not None:
4109 self.do_kernel_path = True
4110 if self.kernel_path == constants.VALUE_NONE:
4111 raise errors.OpPrereqError("Can't set instance to no kernel")
4113 if self.kernel_path != constants.VALUE_DEFAULT:
4114 if not os.path.isabs(self.kernel_path):
4115 raise errors.OpPrereqError("The kernel path must be an absolute"
4118 self.do_kernel_path = False
4120 if self.initrd_path is not None:
4121 self.do_initrd_path = True
4122 if self.initrd_path not in (constants.VALUE_NONE,
4123 constants.VALUE_DEFAULT):
4124 if not os.path.isabs(self.initrd_path):
4125 raise errors.OpPrereqError("The initrd path must be an absolute"
4128 self.do_initrd_path = False
4130 # boot order verification
4131 if self.hvm_boot_order is not None:
4132 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4133 if len(self.hvm_boot_order.strip("acdn")) != 0:
4134 raise errors.OpPrereqError("invalid boot order specified,"
4135 " must be one or more of [acdn]"
4138 # hvm_cdrom_image_path verification
4139 if self.op.hvm_cdrom_image_path is not None:
4140 if not os.path.isabs(self.op.hvm_cdrom_image_path):
4141 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4142 " be an absolute path or None, not %s" %
4143 self.op.hvm_cdrom_image_path)
4144 if not os.path.isfile(self.op.hvm_cdrom_image_path):
4145 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4146 " regular file or a symlink pointing to"
4147 " an existing regular file, not %s" %
4148 self.op.hvm_cdrom_image_path)
4150 # vnc_bind_address verification
4151 if self.op.vnc_bind_address is not None:
4152 if not utils.IsValidIP(self.op.vnc_bind_address):
4153 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4154 " like a valid IP address" %
4155 self.op.vnc_bind_address)
4157 instance = self.cfg.GetInstanceInfo(
4158 self.cfg.ExpandInstanceName(self.op.instance_name))
4159 if instance is None:
4160 raise errors.OpPrereqError("No such instance name '%s'" %
4161 self.op.instance_name)
4162 self.op.instance_name = instance.name
4163 self.instance = instance
4166 def Exec(self, feedback_fn):
4167 """Modifies an instance.
4169 All parameters take effect only at the next restart of the instance.
4172 instance = self.instance
4174 instance.memory = self.mem
4175 result.append(("mem", self.mem))
4177 instance.vcpus = self.vcpus
4178 result.append(("vcpus", self.vcpus))
4180 instance.nics[0].ip = self.ip
4181 result.append(("ip", self.ip))
4183 instance.nics[0].bridge = self.bridge
4184 result.append(("bridge", self.bridge))
4186 instance.nics[0].mac = self.mac
4187 result.append(("mac", self.mac))
4188 if self.do_kernel_path:
4189 instance.kernel_path = self.kernel_path
4190 result.append(("kernel_path", self.kernel_path))
4191 if self.do_initrd_path:
4192 instance.initrd_path = self.initrd_path
4193 result.append(("initrd_path", self.initrd_path))
4194 if self.hvm_boot_order:
4195 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4196 instance.hvm_boot_order = None
4198 instance.hvm_boot_order = self.hvm_boot_order
4199 result.append(("hvm_boot_order", self.hvm_boot_order))
4201 instance.hvm_acpi = self.hvm_acpi
4202 result.append(("hvm_acpi", self.hvm_acpi))
4204 instance.hvm_pae = self.hvm_pae
4205 result.append(("hvm_pae", self.hvm_pae))
4206 if self.hvm_cdrom_image_path:
4207 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4208 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4209 if self.vnc_bind_address:
4210 instance.vnc_bind_address = self.vnc_bind_address
4211 result.append(("vnc_bind_address", self.vnc_bind_address))
4213 self.cfg.AddInstance(instance)
4218 class LUQueryExports(NoHooksLU):
4219 """Query the exports list
4224 def CheckPrereq(self):
4225 """Check that the nodelist contains only existing nodes.
4228 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4230 def Exec(self, feedback_fn):
4231 """Compute the list of all the exported system images.
4234 a dictionary with the structure node->(export-list)
4235 where export-list is a list of the instances exported on
4239 return rpc.call_export_list(self.nodes)
4242 class LUExportInstance(LogicalUnit):
4243 """Export an instance to an image in the cluster.
4246 HPATH = "instance-export"
4247 HTYPE = constants.HTYPE_INSTANCE
4248 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4250 def BuildHooksEnv(self):
4253 This will run on the master, primary node and target node.
4257 "EXPORT_NODE": self.op.target_node,
4258 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4260 env.update(_BuildInstanceHookEnvByObject(self.instance))
4261 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4262 self.op.target_node]
4265 def CheckPrereq(self):
4266 """Check prerequisites.
4268 This checks that the instance and node names are valid.
4271 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4272 self.instance = self.cfg.GetInstanceInfo(instance_name)
4273 if self.instance is None:
4274 raise errors.OpPrereqError("Instance '%s' not found" %
4275 self.op.instance_name)
4278 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4279 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4281 if self.dst_node is None:
4282 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4283 self.op.target_node)
4284 self.op.target_node = self.dst_node.name
4286 # instance disk type verification
4287 for disk in self.instance.disks:
4288 if disk.dev_type == constants.LD_FILE:
4289 raise errors.OpPrereqError("Export not supported for instances with"
4290 " file-based disks")
4292 def Exec(self, feedback_fn):
4293 """Export an instance to an image in the cluster.
4296 instance = self.instance
4297 dst_node = self.dst_node
4298 src_node = instance.primary_node
4299 if self.op.shutdown:
4300 # shutdown the instance, but not the disks
4301 if not rpc.call_instance_shutdown(src_node, instance):
4302 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4303 (instance.name, src_node))
4305 vgname = self.cfg.GetVGName()
4310 for disk in instance.disks:
4311 if disk.iv_name == "sda":
4312 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4313 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4315 if not new_dev_name:
4316 logger.Error("could not snapshot block device %s on node %s" %
4317 (disk.logical_id[1], src_node))
4319 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4320 logical_id=(vgname, new_dev_name),
4321 physical_id=(vgname, new_dev_name),
4322 iv_name=disk.iv_name)
4323 snap_disks.append(new_dev)
4326 if self.op.shutdown and instance.status == "up":
4327 if not rpc.call_instance_start(src_node, instance, None):
4328 _ShutdownInstanceDisks(instance, self.cfg)
4329 raise errors.OpExecError("Could not start instance")
4331 # TODO: check for size
4333 for dev in snap_disks:
4334 if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4335 logger.Error("could not export block device %s from node %s to node %s"
4336 % (dev.logical_id[1], src_node, dst_node.name))
4337 if not rpc.call_blockdev_remove(src_node, dev):
4338 logger.Error("could not remove snapshot block device %s from node %s" %
4339 (dev.logical_id[1], src_node))
4341 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4342 logger.Error("could not finalize export for instance %s on node %s" %
4343 (instance.name, dst_node.name))
4345 nodelist = self.cfg.GetNodeList()
4346 nodelist.remove(dst_node.name)
4348 # on one-node clusters nodelist will be empty after the removal
4349 # if we proceed the backup would be removed because OpQueryExports
4350 # substitutes an empty list with the full cluster node list.
4352 op = opcodes.OpQueryExports(nodes=nodelist)
4353 exportlist = self.proc.ChainOpCode(op)
4354 for node in exportlist:
4355 if instance.name in exportlist[node]:
4356 if not rpc.call_export_remove(node, instance.name):
4357 logger.Error("could not remove older export for instance %s"
4358 " on node %s" % (instance.name, node))
4361 class LURemoveExport(NoHooksLU):
4362 """Remove exports related to the named instance.
4365 _OP_REQP = ["instance_name"]
4367 def CheckPrereq(self):
4368 """Check prerequisites.
4372 def Exec(self, feedback_fn):
4373 """Remove any export.
4376 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4377 # If the instance was not found we'll try with the name that was passed in.
4378 # This will only work if it was an FQDN, though.
4380 if not instance_name:
4382 instance_name = self.op.instance_name
4384 op = opcodes.OpQueryExports(nodes=[])
4385 exportlist = self.proc.ChainOpCode(op)
4387 for node in exportlist:
4388 if instance_name in exportlist[node]:
4390 if not rpc.call_export_remove(node, instance_name):
4391 logger.Error("could not remove export for instance %s"
4392 " on node %s" % (instance_name, node))
4394 if fqdn_warn and not found:
4395 feedback_fn("Export not found. If trying to remove an export belonging"
4396 " to a deleted instance please use its Fully Qualified"
4400 class TagsLU(NoHooksLU):
4403 This is an abstract class which is the parent of all the other tags LUs.
4406 def CheckPrereq(self):
4407 """Check prerequisites.
4410 if self.op.kind == constants.TAG_CLUSTER:
4411 self.target = self.cfg.GetClusterInfo()
4412 elif self.op.kind == constants.TAG_NODE:
4413 name = self.cfg.ExpandNodeName(self.op.name)
4415 raise errors.OpPrereqError("Invalid node name (%s)" %
4418 self.target = self.cfg.GetNodeInfo(name)
4419 elif self.op.kind == constants.TAG_INSTANCE:
4420 name = self.cfg.ExpandInstanceName(self.op.name)
4422 raise errors.OpPrereqError("Invalid instance name (%s)" %
4425 self.target = self.cfg.GetInstanceInfo(name)
4427 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4431 class LUGetTags(TagsLU):
4432 """Returns the tags of a given object.
4435 _OP_REQP = ["kind", "name"]
4437 def Exec(self, feedback_fn):
4438 """Returns the tag list.
4441 return self.target.GetTags()
4444 class LUSearchTags(NoHooksLU):
4445 """Searches the tags for a given pattern.
4448 _OP_REQP = ["pattern"]
4450 def CheckPrereq(self):
4451 """Check prerequisites.
4453 This checks the pattern passed for validity by compiling it.
4457 self.re = re.compile(self.op.pattern)
4458 except re.error, err:
4459 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4460 (self.op.pattern, err))
4462 def Exec(self, feedback_fn):
4463 """Returns the tag list.
4467 tgts = [("/cluster", cfg.GetClusterInfo())]
4468 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4469 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4470 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4471 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4473 for path, target in tgts:
4474 for tag in target.GetTags():
4475 if self.re.search(tag):
4476 results.append((path, tag))
4480 class LUAddTags(TagsLU):
4481 """Sets a tag on a given object.
4484 _OP_REQP = ["kind", "name", "tags"]
4486 def CheckPrereq(self):
4487 """Check prerequisites.
4489 This checks the type and length of the tag name and value.
4492 TagsLU.CheckPrereq(self)
4493 for tag in self.op.tags:
4494 objects.TaggableObject.ValidateTag(tag)
4496 def Exec(self, feedback_fn):
4501 for tag in self.op.tags:
4502 self.target.AddTag(tag)
4503 except errors.TagError, err:
4504 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4506 self.cfg.Update(self.target)
4507 except errors.ConfigurationError:
4508 raise errors.OpRetryError("There has been a modification to the"
4509 " config file and the operation has been"
4510 " aborted. Please retry.")
4513 class LUDelTags(TagsLU):
4514 """Delete a list of tags from a given object.
4517 _OP_REQP = ["kind", "name", "tags"]
4519 def CheckPrereq(self):
4520 """Check prerequisites.
4522 This checks that we have the given tag.
4525 TagsLU.CheckPrereq(self)
4526 for tag in self.op.tags:
4527 objects.TaggableObject.ValidateTag(tag)
4528 del_tags = frozenset(self.op.tags)
4529 cur_tags = self.target.GetTags()
4530 if not del_tags <= cur_tags:
4531 diff_tags = del_tags - cur_tags
4532 diff_names = ["'%s'" % tag for tag in diff_tags]
4534 raise errors.OpPrereqError("Tag(s) %s not found" %
4535 (",".join(diff_names)))
4537 def Exec(self, feedback_fn):
4538 """Remove the tag from the object.
4541 for tag in self.op.tags:
4542 self.target.RemoveTag(tag)
4544 self.cfg.Update(self.target)
4545 except errors.ConfigurationError:
4546 raise errors.OpRetryError("There has been a modification to the"
4547 " config file and the operation has been"
4548 " aborted. Please retry.")
4550 class LUTestDelay(NoHooksLU):
4551 """Sleep for a specified amount of time.
4553 This LU sleeps on the master and/or nodes for a specified amoutn of
4557 _OP_REQP = ["duration", "on_master", "on_nodes"]
4559 def CheckPrereq(self):
4560 """Check prerequisites.
4562 This checks that we have a good list of nodes and/or the duration
4567 if self.op.on_nodes:
4568 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4570 def Exec(self, feedback_fn):
4571 """Do the actual sleep.
4574 if self.op.on_master:
4575 if not utils.TestDelay(self.op.duration):
4576 raise errors.OpExecError("Error during master delay test")
4577 if self.op.on_nodes:
4578 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4580 raise errors.OpExecError("Complete failure from rpc call")
4581 for node, node_result in result.items():
4583 raise errors.OpExecError("Failure during rpc call to node %s,"
4584 " result: %s" % (node, node_result))
4587 class IAllocator(object):
4588 """IAllocator framework.
4590 An IAllocator instance has three sets of attributes:
4591 - cfg/sstore that are needed to query the cluster
4592 - input data (all members of the _KEYS class attribute are required)
4593 - four buffer attributes (in|out_data|text), that represent the
4594 input (to the external script) in text and data structure format,
4595 and the output from it, again in two formats
4596 - the result variables from the script (success, info, nodes) for
4601 "mem_size", "disks", "disk_template",
4602 "os", "tags", "nics", "vcpus",
4608 def __init__(self, cfg, sstore, mode, name, **kwargs):
4610 self.sstore = sstore
4611 # init buffer variables
4612 self.in_text = self.out_text = self.in_data = self.out_data = None
4613 # init all input fields so that pylint is happy
4616 self.mem_size = self.disks = self.disk_template = None
4617 self.os = self.tags = self.nics = self.vcpus = None
4618 self.relocate_from = None
4620 self.required_nodes = None
4621 # init result fields
4622 self.success = self.info = self.nodes = None
4623 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4624 keyset = self._ALLO_KEYS
4625 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4626 keyset = self._RELO_KEYS
4628 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4629 " IAllocator" % self.mode)
4631 if key not in keyset:
4632 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4633 " IAllocator" % key)
4634 setattr(self, key, kwargs[key])
4636 if key not in kwargs:
4637 raise errors.ProgrammerError("Missing input parameter '%s' to"
4638 " IAllocator" % key)
4639 self._BuildInputData()
4641 def _ComputeClusterData(self):
4642 """Compute the generic allocator input data.
4644 This is the data that is independent of the actual operation.
4651 "cluster_name": self.sstore.GetClusterName(),
4652 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4653 "hypervisor_type": self.sstore.GetHypervisorType(),
4654 # we don't have job IDs
4657 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4661 node_list = cfg.GetNodeList()
4662 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4663 for nname in node_list:
4664 ninfo = cfg.GetNodeInfo(nname)
4665 if nname not in node_data or not isinstance(node_data[nname], dict):
4666 raise errors.OpExecError("Can't get data for node %s" % nname)
4667 remote_info = node_data[nname]
4668 for attr in ['memory_total', 'memory_free', 'memory_dom0',
4669 'vg_size', 'vg_free', 'cpu_total']:
4670 if attr not in remote_info:
4671 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4674 remote_info[attr] = int(remote_info[attr])
4675 except ValueError, err:
4676 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4677 " %s" % (nname, attr, str(err)))
4678 # compute memory used by primary instances
4679 i_p_mem = i_p_up_mem = 0
4680 for iinfo in i_list:
4681 if iinfo.primary_node == nname:
4682 i_p_mem += iinfo.memory
4683 if iinfo.status == "up":
4684 i_p_up_mem += iinfo.memory
4686 # compute memory used by instances
4688 "tags": list(ninfo.GetTags()),
4689 "total_memory": remote_info['memory_total'],
4690 "reserved_memory": remote_info['memory_dom0'],
4691 "free_memory": remote_info['memory_free'],
4692 "i_pri_memory": i_p_mem,
4693 "i_pri_up_memory": i_p_up_mem,
4694 "total_disk": remote_info['vg_size'],
4695 "free_disk": remote_info['vg_free'],
4696 "primary_ip": ninfo.primary_ip,
4697 "secondary_ip": ninfo.secondary_ip,
4698 "total_cpus": remote_info['cpu_total'],
4700 node_results[nname] = pnr
4701 data["nodes"] = node_results
4705 for iinfo in i_list:
4706 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4707 for n in iinfo.nics]
4709 "tags": list(iinfo.GetTags()),
4710 "should_run": iinfo.status == "up",
4711 "vcpus": iinfo.vcpus,
4712 "memory": iinfo.memory,
4714 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4716 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4717 "disk_template": iinfo.disk_template,
4719 instance_data[iinfo.name] = pir
4721 data["instances"] = instance_data
4725 def _AddNewInstance(self):
4726 """Add new instance data to allocator structure.
4728 This in combination with _AllocatorGetClusterData will create the
4729 correct structure needed as input for the allocator.
4731 The checks for the completeness of the opcode must have already been
4736 if len(self.disks) != 2:
4737 raise errors.OpExecError("Only two-disk configurations supported")
4739 disk_space = _ComputeDiskSize(self.disk_template,
4740 self.disks[0]["size"], self.disks[1]["size"])
4742 if self.disk_template in constants.DTS_NET_MIRROR:
4743 self.required_nodes = 2
4745 self.required_nodes = 1
4749 "disk_template": self.disk_template,
4752 "vcpus": self.vcpus,
4753 "memory": self.mem_size,
4754 "disks": self.disks,
4755 "disk_space_total": disk_space,
4757 "required_nodes": self.required_nodes,
4759 data["request"] = request
4761 def _AddRelocateInstance(self):
4762 """Add relocate instance data to allocator structure.
4764 This in combination with _IAllocatorGetClusterData will create the
4765 correct structure needed as input for the allocator.
4767 The checks for the completeness of the opcode must have already been
4771 instance = self.cfg.GetInstanceInfo(self.name)
4772 if instance is None:
4773 raise errors.ProgrammerError("Unknown instance '%s' passed to"
4774 " IAllocator" % self.name)
4776 if instance.disk_template not in constants.DTS_NET_MIRROR:
4777 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4779 if len(instance.secondary_nodes) != 1:
4780 raise errors.OpPrereqError("Instance has not exactly one secondary node")
4782 self.required_nodes = 1
4784 disk_space = _ComputeDiskSize(instance.disk_template,
4785 instance.disks[0].size,
4786 instance.disks[1].size)
4791 "disk_space_total": disk_space,
4792 "required_nodes": self.required_nodes,
4793 "relocate_from": self.relocate_from,
4795 self.in_data["request"] = request
4797 def _BuildInputData(self):
4798 """Build input data structures.
4801 self._ComputeClusterData()
4803 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4804 self._AddNewInstance()
4806 self._AddRelocateInstance()
4808 self.in_text = serializer.Dump(self.in_data)
4810 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4811 """Run an instance allocator and return the results.
4816 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4818 if not isinstance(result, tuple) or len(result) != 4:
4819 raise errors.OpExecError("Invalid result from master iallocator runner")
4821 rcode, stdout, stderr, fail = result
4823 if rcode == constants.IARUN_NOTFOUND:
4824 raise errors.OpExecError("Can't find allocator '%s'" % name)
4825 elif rcode == constants.IARUN_FAILURE:
4826 raise errors.OpExecError("Instance allocator call failed: %s,"
4828 (fail, stdout+stderr))
4829 self.out_text = stdout
4831 self._ValidateResult()
4833 def _ValidateResult(self):
4834 """Process the allocator results.
4836 This will process and if successful save the result in
4837 self.out_data and the other parameters.
4841 rdict = serializer.Load(self.out_text)
4842 except Exception, err:
4843 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4845 if not isinstance(rdict, dict):
4846 raise errors.OpExecError("Can't parse iallocator results: not a dict")
4848 for key in "success", "info", "nodes":
4849 if key not in rdict:
4850 raise errors.OpExecError("Can't parse iallocator results:"
4851 " missing key '%s'" % key)
4852 setattr(self, key, rdict[key])
4854 if not isinstance(rdict["nodes"], list):
4855 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4857 self.out_data = rdict
4860 class LUTestAllocator(NoHooksLU):
4861 """Run allocator tests.
4863 This LU runs the allocator tests
4866 _OP_REQP = ["direction", "mode", "name"]
4868 def CheckPrereq(self):
4869 """Check prerequisites.
4871 This checks the opcode parameters depending on the director and mode test.
4874 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4875 for attr in ["name", "mem_size", "disks", "disk_template",
4876 "os", "tags", "nics", "vcpus"]:
4877 if not hasattr(self.op, attr):
4878 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4880 iname = self.cfg.ExpandInstanceName(self.op.name)
4881 if iname is not None:
4882 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4884 if not isinstance(self.op.nics, list):
4885 raise errors.OpPrereqError("Invalid parameter 'nics'")
4886 for row in self.op.nics:
4887 if (not isinstance(row, dict) or
4890 "bridge" not in row):
4891 raise errors.OpPrereqError("Invalid contents of the"
4892 " 'nics' parameter")
4893 if not isinstance(self.op.disks, list):
4894 raise errors.OpPrereqError("Invalid parameter 'disks'")
4895 if len(self.op.disks) != 2:
4896 raise errors.OpPrereqError("Only two-disk configurations supported")
4897 for row in self.op.disks:
4898 if (not isinstance(row, dict) or
4899 "size" not in row or
4900 not isinstance(row["size"], int) or
4901 "mode" not in row or
4902 row["mode"] not in ['r', 'w']):
4903 raise errors.OpPrereqError("Invalid contents of the"
4904 " 'disks' parameter")
4905 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4906 if not hasattr(self.op, "name"):
4907 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4908 fname = self.cfg.ExpandInstanceName(self.op.name)
4910 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4912 self.op.name = fname
4913 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4915 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4918 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4919 if not hasattr(self.op, "allocator") or self.op.allocator is None:
4920 raise errors.OpPrereqError("Missing allocator name")
4921 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4922 raise errors.OpPrereqError("Wrong allocator test '%s'" %
4925 def Exec(self, feedback_fn):
4926 """Run the allocator test.
4929 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4930 ial = IAllocator(self.cfg, self.sstore,
4933 mem_size=self.op.mem_size,
4934 disks=self.op.disks,
4935 disk_template=self.op.disk_template,
4939 vcpus=self.op.vcpus,
4942 ial = IAllocator(self.cfg, self.sstore,
4945 relocate_from=list(self.relocate_from),
4948 if self.op.direction == constants.IALLOCATOR_DIR_IN:
4949 result = ial.in_text
4951 ial.Run(self.op.allocator, validate=False)
4952 result = ial.out_text