4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements:
58 REQ_MASTER: the LU needs to run on the master node
59 REQ_WSSTORE: the LU needs a writable SimpleStore
61 Note that all commands require root permissions.
70 def __init__(self, processor, op, cfg, sstore):
71 """Constructor for LogicalUnit.
73 This needs to be overriden in derived classes in order to check op
83 for attr_name in self._OP_REQP:
84 attr_val = getattr(op, attr_name, None)
86 raise errors.OpPrereqError("Required parameter '%s' missing" %
89 if not cfg.IsCluster():
90 raise errors.OpPrereqError("Cluster not initialized yet,"
91 " use 'gnt-cluster init' first.")
93 master = sstore.GetMasterNode()
94 if master != utils.HostInfo().name:
95 raise errors.OpPrereqError("Commands must be run on the master"
99 """Returns the SshRunner object
103 self.__ssh = ssh.SshRunner(self.sstore)
106 ssh = property(fget=__GetSSH)
108 def CheckPrereq(self):
109 """Check prerequisites for this LU.
111 This method should check that the prerequisites for the execution
112 of this LU are fulfilled. It can do internode communication, but
113 it should be idempotent - no cluster or system changes are
116 The method should raise errors.OpPrereqError in case something is
117 not fulfilled. Its return value is ignored.
119 This method should also update all the parameters of the opcode to
120 their canonical form; e.g. a short node name must be fully
121 expanded after this method has successfully completed (so that
122 hooks, logging, etc. work correctly).
125 raise NotImplementedError
127 def Exec(self, feedback_fn):
130 This method should implement the actual work. It should raise
131 errors.OpExecError for failures that are somewhat dealt with in
135 raise NotImplementedError
137 def BuildHooksEnv(self):
138 """Build hooks environment for this LU.
140 This method should return a three-node tuple consisting of: a dict
141 containing the environment that will be used for running the
142 specific hook for this LU, a list of node names on which the hook
143 should run before the execution, and a list of node names on which
144 the hook should run after the execution.
146 The keys of the dict must not have 'GANETI_' prefixed as this will
147 be handled in the hooks runner. Also note additional keys will be
148 added by the hooks runner. If the LU doesn't define any
149 environment, an empty dict (and not None) should be returned.
151 No nodes should be returned as an empty list (and not None).
153 Note that if the HPATH for a LU class is None, this function will
157 raise NotImplementedError
159 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
160 """Notify the LU about the results of its hooks.
162 This method is called every time a hooks phase is executed, and notifies
163 the Logical Unit about the hooks' result. The LU can then use it to alter
164 its result based on the hooks. By default the method does nothing and the
165 previous result is passed back unchanged but any LU can define it if it
166 wants to use the local cluster hook-scripts somehow.
169 phase: the hooks phase that has just been run
170 hooks_results: the results of the multi-node hooks rpc call
171 feedback_fn: function to send feedback back to the caller
172 lu_result: the previous result this LU had, or None in the PRE phase.
178 class NoHooksLU(LogicalUnit):
179 """Simple LU which runs no hooks.
181 This LU is intended as a parent for other LogicalUnits which will
182 run no hooks, in order to reduce duplicate code.
189 def _GetWantedNodes(lu, nodes):
190 """Returns list of checked and expanded node names.
193 nodes: List of nodes (strings) or None for all
196 if not isinstance(nodes, list):
197 raise errors.OpPrereqError("Invalid argument type 'nodes'")
203 node = lu.cfg.ExpandNodeName(name)
205 raise errors.OpPrereqError("No such node name '%s'" % name)
209 wanted = lu.cfg.GetNodeList()
210 return utils.NiceSort(wanted)
213 def _GetWantedInstances(lu, instances):
214 """Returns list of checked and expanded instance names.
217 instances: List of instances (strings) or None for all
220 if not isinstance(instances, list):
221 raise errors.OpPrereqError("Invalid argument type 'instances'")
226 for name in instances:
227 instance = lu.cfg.ExpandInstanceName(name)
229 raise errors.OpPrereqError("No such instance name '%s'" % name)
230 wanted.append(instance)
233 wanted = lu.cfg.GetInstanceList()
234 return utils.NiceSort(wanted)
237 def _CheckOutputFields(static, dynamic, selected):
238 """Checks whether all selected fields are valid.
241 static: Static fields
242 dynamic: Dynamic fields
245 static_fields = frozenset(static)
246 dynamic_fields = frozenset(dynamic)
248 all_fields = static_fields | dynamic_fields
250 if not all_fields.issuperset(selected):
251 raise errors.OpPrereqError("Unknown output fields selected: %s"
252 % ",".join(frozenset(selected).
253 difference(all_fields)))
256 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
257 memory, vcpus, nics):
258 """Builds instance related env variables for hooks from single variables.
261 secondary_nodes: List of secondary nodes as strings
265 "INSTANCE_NAME": name,
266 "INSTANCE_PRIMARY": primary_node,
267 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
268 "INSTANCE_OS_TYPE": os_type,
269 "INSTANCE_STATUS": status,
270 "INSTANCE_MEMORY": memory,
271 "INSTANCE_VCPUS": vcpus,
275 nic_count = len(nics)
276 for idx, (ip, bridge, mac) in enumerate(nics):
279 env["INSTANCE_NIC%d_IP" % idx] = ip
280 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
281 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
285 env["INSTANCE_NIC_COUNT"] = nic_count
290 def _BuildInstanceHookEnvByObject(instance, override=None):
291 """Builds instance related env variables for hooks from an object.
294 instance: objects.Instance object of instance
295 override: dict of values to override
298 'name': instance.name,
299 'primary_node': instance.primary_node,
300 'secondary_nodes': instance.secondary_nodes,
301 'os_type': instance.os,
302 'status': instance.os,
303 'memory': instance.memory,
304 'vcpus': instance.vcpus,
305 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
308 args.update(override)
309 return _BuildInstanceHookEnv(**args)
312 def _CheckInstanceBridgesExist(instance):
313 """Check that the brigdes needed by an instance exist.
316 # check bridges existance
317 brlist = [nic.bridge for nic in instance.nics]
318 if not rpc.call_bridges_exist(instance.primary_node, brlist):
319 raise errors.OpPrereqError("one or more target bridges %s does not"
320 " exist on destination node '%s'" %
321 (brlist, instance.primary_node))
324 class LUDestroyCluster(NoHooksLU):
325 """Logical unit for destroying the cluster.
330 def CheckPrereq(self):
331 """Check prerequisites.
333 This checks whether the cluster is empty.
335 Any errors are signalled by raising errors.OpPrereqError.
338 master = self.sstore.GetMasterNode()
340 nodelist = self.cfg.GetNodeList()
341 if len(nodelist) != 1 or nodelist[0] != master:
342 raise errors.OpPrereqError("There are still %d node(s) in"
343 " this cluster." % (len(nodelist) - 1))
344 instancelist = self.cfg.GetInstanceList()
346 raise errors.OpPrereqError("There are still %d instance(s) in"
347 " this cluster." % len(instancelist))
349 def Exec(self, feedback_fn):
350 """Destroys the cluster.
353 master = self.sstore.GetMasterNode()
354 if not rpc.call_node_stop_master(master):
355 raise errors.OpExecError("Could not disable the master role")
356 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
357 utils.CreateBackup(priv_key)
358 utils.CreateBackup(pub_key)
359 rpc.call_node_leave_cluster(master)
362 class LUVerifyCluster(LogicalUnit):
363 """Verifies the cluster status.
366 HPATH = "cluster-verify"
367 HTYPE = constants.HTYPE_CLUSTER
368 _OP_REQP = ["skip_checks"]
370 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
371 remote_version, feedback_fn):
372 """Run multiple tests against a node.
375 - compares ganeti version
376 - checks vg existance and size > 20G
377 - checks config file checksum
378 - checks ssh to other nodes
381 node: name of the node to check
382 file_list: required list of files
383 local_cksum: dictionary of local files and their checksums
386 # compares ganeti version
387 local_version = constants.PROTOCOL_VERSION
388 if not remote_version:
389 feedback_fn(" - ERROR: connection to %s failed" % (node))
392 if local_version != remote_version:
393 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
394 (local_version, node, remote_version))
397 # checks vg existance and size > 20G
401 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
405 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
406 constants.MIN_VG_SIZE)
408 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
411 # checks config file checksum
414 if 'filelist' not in node_result:
416 feedback_fn(" - ERROR: node hasn't returned file checksum data")
418 remote_cksum = node_result['filelist']
419 for file_name in file_list:
420 if file_name not in remote_cksum:
422 feedback_fn(" - ERROR: file '%s' missing" % file_name)
423 elif remote_cksum[file_name] != local_cksum[file_name]:
425 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
427 if 'nodelist' not in node_result:
429 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
431 if node_result['nodelist']:
433 for node in node_result['nodelist']:
434 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
435 (node, node_result['nodelist'][node]))
436 if 'node-net-test' not in node_result:
438 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
440 if node_result['node-net-test']:
442 nlist = utils.NiceSort(node_result['node-net-test'].keys())
444 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
445 (node, node_result['node-net-test'][node]))
447 hyp_result = node_result.get('hypervisor', None)
448 if hyp_result is not None:
449 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
452 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
453 node_instance, feedback_fn):
454 """Verify an instance.
456 This function checks to see if the required block devices are
457 available on the instance's node.
462 node_current = instanceconfig.primary_node
465 instanceconfig.MapLVsByNode(node_vol_should)
467 for node in node_vol_should:
468 for volume in node_vol_should[node]:
469 if node not in node_vol_is or volume not in node_vol_is[node]:
470 feedback_fn(" - ERROR: volume %s missing on node %s" %
474 if not instanceconfig.status == 'down':
475 if (node_current not in node_instance or
476 not instance in node_instance[node_current]):
477 feedback_fn(" - ERROR: instance %s not running on node %s" %
478 (instance, node_current))
481 for node in node_instance:
482 if (not node == node_current):
483 if instance in node_instance[node]:
484 feedback_fn(" - ERROR: instance %s should not run on node %s" %
490 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
491 """Verify if there are any unknown volumes in the cluster.
493 The .os, .swap and backup volumes are ignored. All other volumes are
499 for node in node_vol_is:
500 for volume in node_vol_is[node]:
501 if node not in node_vol_should or volume not in node_vol_should[node]:
502 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
507 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
508 """Verify the list of running instances.
510 This checks what instances are running but unknown to the cluster.
514 for node in node_instance:
515 for runninginstance in node_instance[node]:
516 if runninginstance not in instancelist:
517 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
518 (runninginstance, node))
522 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
523 """Verify N+1 Memory Resilience.
525 Check that if one single node dies we can still start all the instances it
531 for node, nodeinfo in node_info.iteritems():
532 # This code checks that every node which is now listed as secondary has
533 # enough memory to host all instances it is supposed to should a single
534 # other node in the cluster fail.
535 # FIXME: not ready for failover to an arbitrary node
536 # FIXME: does not support file-backed instances
537 # WARNING: we currently take into account down instances as well as up
538 # ones, considering that even if they're down someone might want to start
539 # them even in the event of a node failure.
540 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
542 for instance in instances:
543 needed_mem += instance_cfg[instance].memory
544 if nodeinfo['mfree'] < needed_mem:
545 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
546 " failovers should node %s fail" % (node, prinode))
550 def CheckPrereq(self):
551 """Check prerequisites.
553 Transform the list of checks we're going to skip into a set and check that
554 all its members are valid.
557 self.skip_set = frozenset(self.op.skip_checks)
558 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
559 raise errors.OpPrereqError("Invalid checks to be skipped specified")
561 def BuildHooksEnv(self):
564 Cluster-Verify hooks just rone in the post phase and their failure makes
565 the output be logged in the verify output and the verification to fail.
568 all_nodes = self.cfg.GetNodeList()
569 # TODO: populate the environment with useful information for verify hooks
571 return env, [], all_nodes
573 def Exec(self, feedback_fn):
574 """Verify integrity of cluster, performing various test on nodes.
578 feedback_fn("* Verifying global settings")
579 for msg in self.cfg.VerifyConfig():
580 feedback_fn(" - ERROR: %s" % msg)
582 vg_name = self.cfg.GetVGName()
583 nodelist = utils.NiceSort(self.cfg.GetNodeList())
584 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
585 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
586 i_non_redundant = [] # Non redundant instances
592 # FIXME: verify OS list
594 file_names = list(self.sstore.GetFileList())
595 file_names.append(constants.SSL_CERT_FILE)
596 file_names.append(constants.CLUSTER_CONF_FILE)
597 local_checksums = utils.FingerprintFiles(file_names)
599 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
600 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
601 all_instanceinfo = rpc.call_instance_list(nodelist)
602 all_vglist = rpc.call_vg_list(nodelist)
603 node_verify_param = {
604 'filelist': file_names,
605 'nodelist': nodelist,
607 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
608 for node in nodeinfo]
610 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
611 all_rversion = rpc.call_version(nodelist)
612 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
614 for node in nodelist:
615 feedback_fn("* Verifying node %s" % node)
616 result = self._VerifyNode(node, file_names, local_checksums,
617 all_vglist[node], all_nvinfo[node],
618 all_rversion[node], feedback_fn)
622 volumeinfo = all_volumeinfo[node]
624 if isinstance(volumeinfo, basestring):
625 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
626 (node, volumeinfo[-400:].encode('string_escape')))
628 node_volume[node] = {}
629 elif not isinstance(volumeinfo, dict):
630 feedback_fn(" - ERROR: connection to %s failed" % (node,))
634 node_volume[node] = volumeinfo
637 nodeinstance = all_instanceinfo[node]
638 if type(nodeinstance) != list:
639 feedback_fn(" - ERROR: connection to %s failed" % (node,))
643 node_instance[node] = nodeinstance
646 nodeinfo = all_ninfo[node]
647 if not isinstance(nodeinfo, dict):
648 feedback_fn(" - ERROR: connection to %s failed" % (node,))
654 "mfree": int(nodeinfo['memory_free']),
655 "dfree": int(nodeinfo['vg_free']),
658 # dictionary holding all instances this node is secondary for,
659 # grouped by their primary node. Each key is a cluster node, and each
660 # value is a list of instances which have the key as primary and the
661 # current node as secondary. this is handy to calculate N+1 memory
662 # availability if you can only failover from a primary to its
664 "sinst-by-pnode": {},
667 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
673 for instance in instancelist:
674 feedback_fn("* Verifying instance %s" % instance)
675 inst_config = self.cfg.GetInstanceInfo(instance)
676 result = self._VerifyInstance(instance, inst_config, node_volume,
677 node_instance, feedback_fn)
680 inst_config.MapLVsByNode(node_vol_should)
682 instance_cfg[instance] = inst_config
684 pnode = inst_config.primary_node
685 if pnode in node_info:
686 node_info[pnode]['pinst'].append(instance)
688 feedback_fn(" - ERROR: instance %s, connection to primary node"
689 " %s failed" % (instance, pnode))
692 # If the instance is non-redundant we cannot survive losing its primary
693 # node, so we are not N+1 compliant. On the other hand we have no disk
694 # templates with more than one secondary so that situation is not well
696 # FIXME: does not support file-backed instances
697 if len(inst_config.secondary_nodes) == 0:
698 i_non_redundant.append(instance)
699 elif len(inst_config.secondary_nodes) > 1:
700 feedback_fn(" - WARNING: multiple secondaries for instance %s"
703 for snode in inst_config.secondary_nodes:
704 if snode in node_info:
705 node_info[snode]['sinst'].append(instance)
706 if pnode not in node_info[snode]['sinst-by-pnode']:
707 node_info[snode]['sinst-by-pnode'][pnode] = []
708 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
710 feedback_fn(" - ERROR: instance %s, connection to secondary node"
711 " %s failed" % (instance, snode))
713 feedback_fn("* Verifying orphan volumes")
714 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
718 feedback_fn("* Verifying remaining instances")
719 result = self._VerifyOrphanInstances(instancelist, node_instance,
723 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
724 feedback_fn("* Verifying N+1 Memory redundancy")
725 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
728 feedback_fn("* Other Notes")
730 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
731 % len(i_non_redundant))
735 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
736 """Analize the post-hooks' result, handle it, and send some
737 nicely-formatted feedback back to the user.
740 phase: the hooks phase that has just been run
741 hooks_results: the results of the multi-node hooks rpc call
742 feedback_fn: function to send feedback back to the caller
743 lu_result: previous Exec result
746 # We only really run POST phase hooks, and are only interested in their results
747 if phase == constants.HOOKS_PHASE_POST:
748 # Used to change hooks' output to proper indentation
749 indent_re = re.compile('^', re.M)
750 feedback_fn("* Hooks Results")
751 if not hooks_results:
752 feedback_fn(" - ERROR: general communication failure")
755 for node_name in hooks_results:
756 show_node_header = True
757 res = hooks_results[node_name]
758 if res is False or not isinstance(res, list):
759 feedback_fn(" Communication failure")
762 for script, hkr, output in res:
763 if hkr == constants.HKR_FAIL:
764 # The node header is only shown once, if there are
765 # failing hooks on that node
767 feedback_fn(" Node %s:" % node_name)
768 show_node_header = False
769 feedback_fn(" ERROR: Script %s failed, output:" % script)
770 output = indent_re.sub(' ', output)
771 feedback_fn("%s" % output)
777 class LUVerifyDisks(NoHooksLU):
778 """Verifies the cluster disks status.
783 def CheckPrereq(self):
784 """Check prerequisites.
786 This has no prerequisites.
791 def Exec(self, feedback_fn):
792 """Verify integrity of cluster disks.
795 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
797 vg_name = self.cfg.GetVGName()
798 nodes = utils.NiceSort(self.cfg.GetNodeList())
799 instances = [self.cfg.GetInstanceInfo(name)
800 for name in self.cfg.GetInstanceList()]
803 for inst in instances:
805 if (inst.status != "up" or
806 inst.disk_template not in constants.DTS_NET_MIRROR):
808 inst.MapLVsByNode(inst_lvs)
809 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
810 for node, vol_list in inst_lvs.iteritems():
812 nv_dict[(node, vol)] = inst
817 node_lvs = rpc.call_volume_list(nodes, vg_name)
824 if isinstance(lvs, basestring):
825 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
827 elif not isinstance(lvs, dict):
828 logger.Info("connection to node %s failed or invalid data returned" %
830 res_nodes.append(node)
833 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
834 inst = nv_dict.pop((node, lv_name), None)
835 if (not lv_online and inst is not None
836 and inst.name not in res_instances):
837 res_instances.append(inst.name)
839 # any leftover items in nv_dict are missing LVs, let's arrange the
841 for key, inst in nv_dict.iteritems():
842 if inst.name not in res_missing:
843 res_missing[inst.name] = []
844 res_missing[inst.name].append(key)
849 class LURenameCluster(LogicalUnit):
850 """Rename the cluster.
853 HPATH = "cluster-rename"
854 HTYPE = constants.HTYPE_CLUSTER
858 def BuildHooksEnv(self):
863 "OP_TARGET": self.sstore.GetClusterName(),
864 "NEW_NAME": self.op.name,
866 mn = self.sstore.GetMasterNode()
867 return env, [mn], [mn]
869 def CheckPrereq(self):
870 """Verify that the passed name is a valid one.
873 hostname = utils.HostInfo(self.op.name)
875 new_name = hostname.name
876 self.ip = new_ip = hostname.ip
877 old_name = self.sstore.GetClusterName()
878 old_ip = self.sstore.GetMasterIP()
879 if new_name == old_name and new_ip == old_ip:
880 raise errors.OpPrereqError("Neither the name nor the IP address of the"
881 " cluster has changed")
883 result = utils.RunCmd(["fping", "-q", new_ip])
884 if not result.failed:
885 raise errors.OpPrereqError("The given cluster IP address (%s) is"
886 " reachable on the network. Aborting." %
889 self.op.name = new_name
891 def Exec(self, feedback_fn):
892 """Rename the cluster.
895 clustername = self.op.name
899 # shutdown the master IP
900 master = ss.GetMasterNode()
901 if not rpc.call_node_stop_master(master):
902 raise errors.OpExecError("Could not disable the master role")
906 ss.SetKey(ss.SS_MASTER_IP, ip)
907 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
909 # Distribute updated ss config to all nodes
910 myself = self.cfg.GetNodeInfo(master)
911 dist_nodes = self.cfg.GetNodeList()
912 if myself.name in dist_nodes:
913 dist_nodes.remove(myself.name)
915 logger.Debug("Copying updated ssconf data to all nodes")
916 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
917 fname = ss.KeyToFilename(keyname)
918 result = rpc.call_upload_file(dist_nodes, fname)
919 for to_node in dist_nodes:
920 if not result[to_node]:
921 logger.Error("copy of file %s to node %s failed" %
924 if not rpc.call_node_start_master(master):
925 logger.Error("Could not re-enable the master role on the master,"
926 " please restart manually.")
929 def _RecursiveCheckIfLVMBased(disk):
930 """Check if the given disk or its children are lvm-based.
933 disk: ganeti.objects.Disk object
936 boolean indicating whether a LD_LV dev_type was found or not
940 for chdisk in disk.children:
941 if _RecursiveCheckIfLVMBased(chdisk):
943 return disk.dev_type == constants.LD_LV
946 class LUSetClusterParams(LogicalUnit):
947 """Change the parameters of the cluster.
950 HPATH = "cluster-modify"
951 HTYPE = constants.HTYPE_CLUSTER
954 def BuildHooksEnv(self):
959 "OP_TARGET": self.sstore.GetClusterName(),
960 "NEW_VG_NAME": self.op.vg_name,
962 mn = self.sstore.GetMasterNode()
963 return env, [mn], [mn]
965 def CheckPrereq(self):
966 """Check prerequisites.
968 This checks whether the given params don't conflict and
969 if the given volume group is valid.
972 if not self.op.vg_name:
973 instances = [self.cfg.GetInstanceInfo(name)
974 for name in self.cfg.GetInstanceList()]
975 for inst in instances:
976 for disk in inst.disks:
977 if _RecursiveCheckIfLVMBased(disk):
978 raise errors.OpPrereqError("Cannot disable lvm storage while"
979 " lvm-based instances exist")
981 # if vg_name not None, checks given volume group on all nodes
983 node_list = self.cfg.GetNodeList()
984 vglist = rpc.call_vg_list(node_list)
985 for node in node_list:
986 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
987 constants.MIN_VG_SIZE)
989 raise errors.OpPrereqError("Error on node '%s': %s" %
992 def Exec(self, feedback_fn):
993 """Change the parameters of the cluster.
996 if self.op.vg_name != self.cfg.GetVGName():
997 self.cfg.SetVGName(self.op.vg_name)
999 feedback_fn("Cluster LVM configuration already in desired"
1000 " state, not changing")
1003 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1004 """Sleep and poll for an instance's disk to sync.
1007 if not instance.disks:
1011 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1013 node = instance.primary_node
1015 for dev in instance.disks:
1016 cfgw.SetDiskID(dev, node)
1022 cumul_degraded = False
1023 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1025 proc.LogWarning("Can't get any data from node %s" % node)
1028 raise errors.RemoteError("Can't contact node %s for mirror data,"
1029 " aborting." % node)
1033 for i in range(len(rstats)):
1036 proc.LogWarning("Can't compute data for node %s/%s" %
1037 (node, instance.disks[i].iv_name))
1039 # we ignore the ldisk parameter
1040 perc_done, est_time, is_degraded, _ = mstat
1041 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1042 if perc_done is not None:
1044 if est_time is not None:
1045 rem_time = "%d estimated seconds remaining" % est_time
1048 rem_time = "no time estimate"
1049 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1050 (instance.disks[i].iv_name, perc_done, rem_time))
1055 #utils.Unlock('cmd')
1058 time.sleep(min(60, max_time))
1065 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1066 return not cumul_degraded
1069 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1070 """Check that mirrors are not degraded.
1072 The ldisk parameter, if True, will change the test from the
1073 is_degraded attribute (which represents overall non-ok status for
1074 the device(s)) to the ldisk (representing the local storage status).
1077 cfgw.SetDiskID(dev, node)
1084 if on_primary or dev.AssembleOnSecondary():
1085 rstats = rpc.call_blockdev_find(node, dev)
1087 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1090 result = result and (not rstats[idx])
1092 for child in dev.children:
1093 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1098 class LUDiagnoseOS(NoHooksLU):
1099 """Logical unit for OS diagnose/query.
1102 _OP_REQP = ["output_fields", "names"]
1104 def CheckPrereq(self):
1105 """Check prerequisites.
1107 This always succeeds, since this is a pure query LU.
1111 raise errors.OpPrereqError("Selective OS query not supported")
1113 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1114 _CheckOutputFields(static=[],
1115 dynamic=self.dynamic_fields,
1116 selected=self.op.output_fields)
1119 def _DiagnoseByOS(node_list, rlist):
1120 """Remaps a per-node return list into an a per-os per-node dictionary
1123 node_list: a list with the names of all nodes
1124 rlist: a map with node names as keys and OS objects as values
1127 map: a map with osnames as keys and as value another map, with
1129 keys and list of OS objects as values
1130 e.g. {"debian-etch": {"node1": [<object>,...],
1131 "node2": [<object>,]}
1136 for node_name, nr in rlist.iteritems():
1140 if os_obj.name not in all_os:
1141 # build a list of nodes for this os containing empty lists
1142 # for each node in node_list
1143 all_os[os_obj.name] = {}
1144 for nname in node_list:
1145 all_os[os_obj.name][nname] = []
1146 all_os[os_obj.name][node_name].append(os_obj)
1149 def Exec(self, feedback_fn):
1150 """Compute the list of OSes.
1153 node_list = self.cfg.GetNodeList()
1154 node_data = rpc.call_os_diagnose(node_list)
1155 if node_data == False:
1156 raise errors.OpExecError("Can't gather the list of OSes")
1157 pol = self._DiagnoseByOS(node_list, node_data)
1159 for os_name, os_data in pol.iteritems():
1161 for field in self.op.output_fields:
1164 elif field == "valid":
1165 val = utils.all([osl and osl[0] for osl in os_data.values()])
1166 elif field == "node_status":
1168 for node_name, nos_list in os_data.iteritems():
1169 val[node_name] = [(v.status, v.path) for v in nos_list]
1171 raise errors.ParameterError(field)
1178 class LURemoveNode(LogicalUnit):
1179 """Logical unit for removing a node.
1182 HPATH = "node-remove"
1183 HTYPE = constants.HTYPE_NODE
1184 _OP_REQP = ["node_name"]
1186 def BuildHooksEnv(self):
1189 This doesn't run on the target node in the pre phase as a failed
1190 node would not allows itself to run.
1194 "OP_TARGET": self.op.node_name,
1195 "NODE_NAME": self.op.node_name,
1197 all_nodes = self.cfg.GetNodeList()
1198 all_nodes.remove(self.op.node_name)
1199 return env, all_nodes, all_nodes
1201 def CheckPrereq(self):
1202 """Check prerequisites.
1205 - the node exists in the configuration
1206 - it does not have primary or secondary instances
1207 - it's not the master
1209 Any errors are signalled by raising errors.OpPrereqError.
1212 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1214 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1216 instance_list = self.cfg.GetInstanceList()
1218 masternode = self.sstore.GetMasterNode()
1219 if node.name == masternode:
1220 raise errors.OpPrereqError("Node is the master node,"
1221 " you need to failover first.")
1223 for instance_name in instance_list:
1224 instance = self.cfg.GetInstanceInfo(instance_name)
1225 if node.name == instance.primary_node:
1226 raise errors.OpPrereqError("Instance %s still running on the node,"
1227 " please remove first." % instance_name)
1228 if node.name in instance.secondary_nodes:
1229 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1230 " please remove first." % instance_name)
1231 self.op.node_name = node.name
1234 def Exec(self, feedback_fn):
1235 """Removes the node from the cluster.
1239 logger.Info("stopping the node daemon and removing configs from node %s" %
1242 rpc.call_node_leave_cluster(node.name)
1244 self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1246 logger.Info("Removing node %s from config" % node.name)
1248 self.cfg.RemoveNode(node.name)
1250 utils.RemoveHostFromEtcHosts(node.name)
1253 class LUQueryNodes(NoHooksLU):
1254 """Logical unit for querying nodes.
1257 _OP_REQP = ["output_fields", "names"]
1259 def CheckPrereq(self):
1260 """Check prerequisites.
1262 This checks that the fields required are valid output fields.
1265 self.dynamic_fields = frozenset([
1267 "mtotal", "mnode", "mfree",
1272 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1273 "pinst_list", "sinst_list",
1275 dynamic=self.dynamic_fields,
1276 selected=self.op.output_fields)
1278 self.wanted = _GetWantedNodes(self, self.op.names)
1280 def Exec(self, feedback_fn):
1281 """Computes the list of nodes and their attributes.
1284 nodenames = self.wanted
1285 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1287 # begin data gathering
1289 if self.dynamic_fields.intersection(self.op.output_fields):
1291 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1292 for name in nodenames:
1293 nodeinfo = node_data.get(name, None)
1296 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1297 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1298 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1299 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1300 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1301 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1302 "bootid": nodeinfo['bootid'],
1305 live_data[name] = {}
1307 live_data = dict.fromkeys(nodenames, {})
1309 node_to_primary = dict([(name, set()) for name in nodenames])
1310 node_to_secondary = dict([(name, set()) for name in nodenames])
1312 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1313 "sinst_cnt", "sinst_list"))
1314 if inst_fields & frozenset(self.op.output_fields):
1315 instancelist = self.cfg.GetInstanceList()
1317 for instance_name in instancelist:
1318 inst = self.cfg.GetInstanceInfo(instance_name)
1319 if inst.primary_node in node_to_primary:
1320 node_to_primary[inst.primary_node].add(inst.name)
1321 for secnode in inst.secondary_nodes:
1322 if secnode in node_to_secondary:
1323 node_to_secondary[secnode].add(inst.name)
1325 # end data gathering
1328 for node in nodelist:
1330 for field in self.op.output_fields:
1333 elif field == "pinst_list":
1334 val = list(node_to_primary[node.name])
1335 elif field == "sinst_list":
1336 val = list(node_to_secondary[node.name])
1337 elif field == "pinst_cnt":
1338 val = len(node_to_primary[node.name])
1339 elif field == "sinst_cnt":
1340 val = len(node_to_secondary[node.name])
1341 elif field == "pip":
1342 val = node.primary_ip
1343 elif field == "sip":
1344 val = node.secondary_ip
1345 elif field in self.dynamic_fields:
1346 val = live_data[node.name].get(field, None)
1348 raise errors.ParameterError(field)
1349 node_output.append(val)
1350 output.append(node_output)
1355 class LUQueryNodeVolumes(NoHooksLU):
1356 """Logical unit for getting volumes on node(s).
1359 _OP_REQP = ["nodes", "output_fields"]
1361 def CheckPrereq(self):
1362 """Check prerequisites.
1364 This checks that the fields required are valid output fields.
1367 self.nodes = _GetWantedNodes(self, self.op.nodes)
1369 _CheckOutputFields(static=["node"],
1370 dynamic=["phys", "vg", "name", "size", "instance"],
1371 selected=self.op.output_fields)
1374 def Exec(self, feedback_fn):
1375 """Computes the list of nodes and their attributes.
1378 nodenames = self.nodes
1379 volumes = rpc.call_node_volumes(nodenames)
1381 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1382 in self.cfg.GetInstanceList()]
1384 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1387 for node in nodenames:
1388 if node not in volumes or not volumes[node]:
1391 node_vols = volumes[node][:]
1392 node_vols.sort(key=lambda vol: vol['dev'])
1394 for vol in node_vols:
1396 for field in self.op.output_fields:
1399 elif field == "phys":
1403 elif field == "name":
1405 elif field == "size":
1406 val = int(float(vol['size']))
1407 elif field == "instance":
1409 if node not in lv_by_node[inst]:
1411 if vol['name'] in lv_by_node[inst][node]:
1417 raise errors.ParameterError(field)
1418 node_output.append(str(val))
1420 output.append(node_output)
1425 class LUAddNode(LogicalUnit):
1426 """Logical unit for adding node to the cluster.
1430 HTYPE = constants.HTYPE_NODE
1431 _OP_REQP = ["node_name"]
1433 def BuildHooksEnv(self):
1436 This will run on all nodes before, and on all nodes + the new node after.
1440 "OP_TARGET": self.op.node_name,
1441 "NODE_NAME": self.op.node_name,
1442 "NODE_PIP": self.op.primary_ip,
1443 "NODE_SIP": self.op.secondary_ip,
1445 nodes_0 = self.cfg.GetNodeList()
1446 nodes_1 = nodes_0 + [self.op.node_name, ]
1447 return env, nodes_0, nodes_1
1449 def CheckPrereq(self):
1450 """Check prerequisites.
1453 - the new node is not already in the config
1455 - its parameters (single/dual homed) matches the cluster
1457 Any errors are signalled by raising errors.OpPrereqError.
1460 node_name = self.op.node_name
1463 dns_data = utils.HostInfo(node_name)
1465 node = dns_data.name
1466 primary_ip = self.op.primary_ip = dns_data.ip
1467 secondary_ip = getattr(self.op, "secondary_ip", None)
1468 if secondary_ip is None:
1469 secondary_ip = primary_ip
1470 if not utils.IsValidIP(secondary_ip):
1471 raise errors.OpPrereqError("Invalid secondary IP given")
1472 self.op.secondary_ip = secondary_ip
1474 node_list = cfg.GetNodeList()
1475 if not self.op.readd and node in node_list:
1476 raise errors.OpPrereqError("Node %s is already in the configuration" %
1478 elif self.op.readd and node not in node_list:
1479 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1481 for existing_node_name in node_list:
1482 existing_node = cfg.GetNodeInfo(existing_node_name)
1484 if self.op.readd and node == existing_node_name:
1485 if (existing_node.primary_ip != primary_ip or
1486 existing_node.secondary_ip != secondary_ip):
1487 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1488 " address configuration as before")
1491 if (existing_node.primary_ip == primary_ip or
1492 existing_node.secondary_ip == primary_ip or
1493 existing_node.primary_ip == secondary_ip or
1494 existing_node.secondary_ip == secondary_ip):
1495 raise errors.OpPrereqError("New node ip address(es) conflict with"
1496 " existing node %s" % existing_node.name)
1498 # check that the type of the node (single versus dual homed) is the
1499 # same as for the master
1500 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1501 master_singlehomed = myself.secondary_ip == myself.primary_ip
1502 newbie_singlehomed = secondary_ip == primary_ip
1503 if master_singlehomed != newbie_singlehomed:
1504 if master_singlehomed:
1505 raise errors.OpPrereqError("The master has no private ip but the"
1506 " new node has one")
1508 raise errors.OpPrereqError("The master has a private ip but the"
1509 " new node doesn't have one")
1511 # checks reachablity
1512 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1513 raise errors.OpPrereqError("Node not reachable by ping")
1515 if not newbie_singlehomed:
1516 # check reachability from my secondary ip to newbie's secondary ip
1517 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1518 source=myself.secondary_ip):
1519 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1520 " based ping to noded port")
1522 self.new_node = objects.Node(name=node,
1523 primary_ip=primary_ip,
1524 secondary_ip=secondary_ip)
1526 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1527 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1528 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1529 constants.VNC_PASSWORD_FILE)
1531 def Exec(self, feedback_fn):
1532 """Adds the new node to the cluster.
1535 new_node = self.new_node
1536 node = new_node.name
1538 # set up inter-node password and certificate and restarts the node daemon
1539 gntpass = self.sstore.GetNodeDaemonPassword()
1540 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1541 raise errors.OpExecError("ganeti password corruption detected")
1542 f = open(constants.SSL_CERT_FILE)
1544 gntpem = f.read(8192)
1547 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1548 # so we use this to detect an invalid certificate; as long as the
1549 # cert doesn't contain this, the here-document will be correctly
1550 # parsed by the shell sequence below
1551 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1552 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1553 if not gntpem.endswith("\n"):
1554 raise errors.OpExecError("PEM must end with newline")
1555 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1557 # and then connect with ssh to set password and start ganeti-noded
1558 # note that all the below variables are sanitized at this point,
1559 # either by being constants or by the checks above
1561 mycommand = ("umask 077 && "
1562 "echo '%s' > '%s' && "
1563 "cat > '%s' << '!EOF.' && \n"
1564 "%s!EOF.\n%s restart" %
1565 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1566 constants.SSL_CERT_FILE, gntpem,
1567 constants.NODE_INITD_SCRIPT))
1569 result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1571 raise errors.OpExecError("Remote command on node %s, error: %s,"
1573 (node, result.fail_reason, result.output))
1575 # check connectivity
1578 result = rpc.call_version([node])[node]
1580 if constants.PROTOCOL_VERSION == result:
1581 logger.Info("communication to node %s fine, sw version %s match" %
1584 raise errors.OpExecError("Version mismatch master version %s,"
1585 " node version %s" %
1586 (constants.PROTOCOL_VERSION, result))
1588 raise errors.OpExecError("Cannot get version from the new node")
1591 logger.Info("copy ssh key to node %s" % node)
1592 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1594 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1595 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1601 keyarray.append(f.read())
1605 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1606 keyarray[3], keyarray[4], keyarray[5])
1609 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1611 # Add node to our /etc/hosts, and add key to known_hosts
1612 utils.AddHostToEtcHosts(new_node.name)
1614 if new_node.secondary_ip != new_node.primary_ip:
1615 if not rpc.call_node_tcp_ping(new_node.name,
1616 constants.LOCALHOST_IP_ADDRESS,
1617 new_node.secondary_ip,
1618 constants.DEFAULT_NODED_PORT,
1620 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1621 " you gave (%s). Please fix and re-run this"
1622 " command." % new_node.secondary_ip)
1624 success, msg = self.ssh.VerifyNodeHostname(node)
1626 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1627 " than the one the resolver gives: %s."
1628 " Please fix and re-run this command." %
1631 # Distribute updated /etc/hosts and known_hosts to all nodes,
1632 # including the node just added
1633 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1634 dist_nodes = self.cfg.GetNodeList()
1635 if not self.op.readd:
1636 dist_nodes.append(node)
1637 if myself.name in dist_nodes:
1638 dist_nodes.remove(myself.name)
1640 logger.Debug("Copying hosts and known_hosts to all nodes")
1641 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1642 result = rpc.call_upload_file(dist_nodes, fname)
1643 for to_node in dist_nodes:
1644 if not result[to_node]:
1645 logger.Error("copy of file %s to node %s failed" %
1648 to_copy = ss.GetFileList()
1649 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1650 to_copy.append(constants.VNC_PASSWORD_FILE)
1651 for fname in to_copy:
1652 if not self.ssh.CopyFileToNode(node, fname):
1653 logger.Error("could not copy file %s to node %s" % (fname, node))
1655 if not self.op.readd:
1656 logger.Info("adding node %s to cluster.conf" % node)
1657 self.cfg.AddNode(new_node)
1660 class LUMasterFailover(LogicalUnit):
1661 """Failover the master node to the current node.
1663 This is a special LU in that it must run on a non-master node.
1666 HPATH = "master-failover"
1667 HTYPE = constants.HTYPE_CLUSTER
1672 def BuildHooksEnv(self):
1675 This will run on the new master only in the pre phase, and on all
1676 the nodes in the post phase.
1680 "OP_TARGET": self.new_master,
1681 "NEW_MASTER": self.new_master,
1682 "OLD_MASTER": self.old_master,
1684 return env, [self.new_master], self.cfg.GetNodeList()
1686 def CheckPrereq(self):
1687 """Check prerequisites.
1689 This checks that we are not already the master.
1692 self.new_master = utils.HostInfo().name
1693 self.old_master = self.sstore.GetMasterNode()
1695 if self.old_master == self.new_master:
1696 raise errors.OpPrereqError("This commands must be run on the node"
1697 " where you want the new master to be."
1698 " %s is already the master" %
1701 def Exec(self, feedback_fn):
1702 """Failover the master node.
1704 This command, when run on a non-master node, will cause the current
1705 master to cease being master, and the non-master to become new
1709 #TODO: do not rely on gethostname returning the FQDN
1710 logger.Info("setting master to %s, old master: %s" %
1711 (self.new_master, self.old_master))
1713 if not rpc.call_node_stop_master(self.old_master):
1714 logger.Error("could disable the master role on the old master"
1715 " %s, please disable manually" % self.old_master)
1718 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1719 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1720 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1721 logger.Error("could not distribute the new simple store master file"
1722 " to the other nodes, please check.")
1724 if not rpc.call_node_start_master(self.new_master):
1725 logger.Error("could not start the master role on the new master"
1726 " %s, please check" % self.new_master)
1727 feedback_fn("Error in activating the master IP on the new master,"
1728 " please fix manually.")
1732 class LUQueryClusterInfo(NoHooksLU):
1733 """Query cluster configuration.
1739 def CheckPrereq(self):
1740 """No prerequsites needed for this LU.
1745 def Exec(self, feedback_fn):
1746 """Return cluster config.
1750 "name": self.sstore.GetClusterName(),
1751 "software_version": constants.RELEASE_VERSION,
1752 "protocol_version": constants.PROTOCOL_VERSION,
1753 "config_version": constants.CONFIG_VERSION,
1754 "os_api_version": constants.OS_API_VERSION,
1755 "export_version": constants.EXPORT_VERSION,
1756 "master": self.sstore.GetMasterNode(),
1757 "architecture": (platform.architecture()[0], platform.machine()),
1758 "hypervisor_type": self.sstore.GetHypervisorType(),
1764 class LUClusterCopyFile(NoHooksLU):
1765 """Copy file to cluster.
1768 _OP_REQP = ["nodes", "filename"]
1770 def CheckPrereq(self):
1771 """Check prerequisites.
1773 It should check that the named file exists and that the given list
1777 if not os.path.exists(self.op.filename):
1778 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1780 self.nodes = _GetWantedNodes(self, self.op.nodes)
1782 def Exec(self, feedback_fn):
1783 """Copy a file from master to some nodes.
1786 opts - class with options as members
1787 args - list containing a single element, the file name
1789 nodes - list containing the name of target nodes; if empty, all nodes
1792 filename = self.op.filename
1794 myname = utils.HostInfo().name
1796 for node in self.nodes:
1799 if not self.ssh.CopyFileToNode(node, filename):
1800 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1803 class LUDumpClusterConfig(NoHooksLU):
1804 """Return a text-representation of the cluster-config.
1809 def CheckPrereq(self):
1810 """No prerequisites.
1815 def Exec(self, feedback_fn):
1816 """Dump a representation of the cluster config to the standard output.
1819 return self.cfg.DumpConfig()
1822 class LURunClusterCommand(NoHooksLU):
1823 """Run a command on some nodes.
1826 _OP_REQP = ["command", "nodes"]
1828 def CheckPrereq(self):
1829 """Check prerequisites.
1831 It checks that the given list of nodes is valid.
1834 self.nodes = _GetWantedNodes(self, self.op.nodes)
1836 def Exec(self, feedback_fn):
1837 """Run a command on some nodes.
1840 # put the master at the end of the nodes list
1841 master_node = self.sstore.GetMasterNode()
1842 if master_node in self.nodes:
1843 self.nodes.remove(master_node)
1844 self.nodes.append(master_node)
1847 for node in self.nodes:
1848 result = self.ssh.Run(node, "root", self.op.command)
1849 data.append((node, result.output, result.exit_code))
1854 class LUActivateInstanceDisks(NoHooksLU):
1855 """Bring up an instance's disks.
1858 _OP_REQP = ["instance_name"]
1860 def CheckPrereq(self):
1861 """Check prerequisites.
1863 This checks that the instance is in the cluster.
1866 instance = self.cfg.GetInstanceInfo(
1867 self.cfg.ExpandInstanceName(self.op.instance_name))
1868 if instance is None:
1869 raise errors.OpPrereqError("Instance '%s' not known" %
1870 self.op.instance_name)
1871 self.instance = instance
1874 def Exec(self, feedback_fn):
1875 """Activate the disks.
1878 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1880 raise errors.OpExecError("Cannot activate block devices")
1885 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1886 """Prepare the block devices for an instance.
1888 This sets up the block devices on all nodes.
1891 instance: a ganeti.objects.Instance object
1892 ignore_secondaries: if true, errors on secondary nodes won't result
1893 in an error return from the function
1896 false if the operation failed
1897 list of (host, instance_visible_name, node_visible_name) if the operation
1898 suceeded with the mapping from node devices to instance devices
1902 iname = instance.name
1903 # With the two passes mechanism we try to reduce the window of
1904 # opportunity for the race condition of switching DRBD to primary
1905 # before handshaking occured, but we do not eliminate it
1907 # The proper fix would be to wait (with some limits) until the
1908 # connection has been made and drbd transitions from WFConnection
1909 # into any other network-connected state (Connected, SyncTarget,
1912 # 1st pass, assemble on all nodes in secondary mode
1913 for inst_disk in instance.disks:
1914 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1915 cfg.SetDiskID(node_disk, node)
1916 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1918 logger.Error("could not prepare block device %s on node %s"
1919 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1920 if not ignore_secondaries:
1923 # FIXME: race condition on drbd migration to primary
1925 # 2nd pass, do only the primary node
1926 for inst_disk in instance.disks:
1927 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1928 if node != instance.primary_node:
1930 cfg.SetDiskID(node_disk, node)
1931 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1933 logger.Error("could not prepare block device %s on node %s"
1934 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1936 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1938 # leave the disks configured for the primary node
1939 # this is a workaround that would be fixed better by
1940 # improving the logical/physical id handling
1941 for disk in instance.disks:
1942 cfg.SetDiskID(disk, instance.primary_node)
1944 return disks_ok, device_info
1947 def _StartInstanceDisks(cfg, instance, force):
1948 """Start the disks of an instance.
1951 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1952 ignore_secondaries=force)
1954 _ShutdownInstanceDisks(instance, cfg)
1955 if force is not None and not force:
1956 logger.Error("If the message above refers to a secondary node,"
1957 " you can retry the operation using '--force'.")
1958 raise errors.OpExecError("Disk consistency error")
1961 class LUDeactivateInstanceDisks(NoHooksLU):
1962 """Shutdown an instance's disks.
1965 _OP_REQP = ["instance_name"]
1967 def CheckPrereq(self):
1968 """Check prerequisites.
1970 This checks that the instance is in the cluster.
1973 instance = self.cfg.GetInstanceInfo(
1974 self.cfg.ExpandInstanceName(self.op.instance_name))
1975 if instance is None:
1976 raise errors.OpPrereqError("Instance '%s' not known" %
1977 self.op.instance_name)
1978 self.instance = instance
1980 def Exec(self, feedback_fn):
1981 """Deactivate the disks
1984 instance = self.instance
1985 ins_l = rpc.call_instance_list([instance.primary_node])
1986 ins_l = ins_l[instance.primary_node]
1987 if not type(ins_l) is list:
1988 raise errors.OpExecError("Can't contact node '%s'" %
1989 instance.primary_node)
1991 if self.instance.name in ins_l:
1992 raise errors.OpExecError("Instance is running, can't shutdown"
1995 _ShutdownInstanceDisks(instance, self.cfg)
1998 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1999 """Shutdown block devices of an instance.
2001 This does the shutdown on all nodes of the instance.
2003 If the ignore_primary is false, errors on the primary node are
2008 for disk in instance.disks:
2009 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2010 cfg.SetDiskID(top_disk, node)
2011 if not rpc.call_blockdev_shutdown(node, top_disk):
2012 logger.Error("could not shutdown block device %s on node %s" %
2013 (disk.iv_name, node))
2014 if not ignore_primary or node != instance.primary_node:
2019 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2020 """Checks if a node has enough free memory.
2022 This function check if a given node has the needed amount of free
2023 memory. In case the node has less memory or we cannot get the
2024 information from the node, this function raise an OpPrereqError
2028 - cfg: a ConfigWriter instance
2029 - node: the node name
2030 - reason: string to use in the error message
2031 - requested: the amount of memory in MiB
2034 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2035 if not nodeinfo or not isinstance(nodeinfo, dict):
2036 raise errors.OpPrereqError("Could not contact node %s for resource"
2037 " information" % (node,))
2039 free_mem = nodeinfo[node].get('memory_free')
2040 if not isinstance(free_mem, int):
2041 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2042 " was '%s'" % (node, free_mem))
2043 if requested > free_mem:
2044 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2045 " needed %s MiB, available %s MiB" %
2046 (node, reason, requested, free_mem))
2049 class LUStartupInstance(LogicalUnit):
2050 """Starts an instance.
2053 HPATH = "instance-start"
2054 HTYPE = constants.HTYPE_INSTANCE
2055 _OP_REQP = ["instance_name", "force"]
2057 def BuildHooksEnv(self):
2060 This runs on master, primary and secondary nodes of the instance.
2064 "FORCE": self.op.force,
2066 env.update(_BuildInstanceHookEnvByObject(self.instance))
2067 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2068 list(self.instance.secondary_nodes))
2071 def CheckPrereq(self):
2072 """Check prerequisites.
2074 This checks that the instance is in the cluster.
2077 instance = self.cfg.GetInstanceInfo(
2078 self.cfg.ExpandInstanceName(self.op.instance_name))
2079 if instance is None:
2080 raise errors.OpPrereqError("Instance '%s' not known" %
2081 self.op.instance_name)
2083 # check bridges existance
2084 _CheckInstanceBridgesExist(instance)
2086 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2087 "starting instance %s" % instance.name,
2090 self.instance = instance
2091 self.op.instance_name = instance.name
2093 def Exec(self, feedback_fn):
2094 """Start the instance.
2097 instance = self.instance
2098 force = self.op.force
2099 extra_args = getattr(self.op, "extra_args", "")
2101 self.cfg.MarkInstanceUp(instance.name)
2103 node_current = instance.primary_node
2105 _StartInstanceDisks(self.cfg, instance, force)
2107 if not rpc.call_instance_start(node_current, instance, extra_args):
2108 _ShutdownInstanceDisks(instance, self.cfg)
2109 raise errors.OpExecError("Could not start instance")
2112 class LURebootInstance(LogicalUnit):
2113 """Reboot an instance.
2116 HPATH = "instance-reboot"
2117 HTYPE = constants.HTYPE_INSTANCE
2118 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2120 def BuildHooksEnv(self):
2123 This runs on master, primary and secondary nodes of the instance.
2127 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2129 env.update(_BuildInstanceHookEnvByObject(self.instance))
2130 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2131 list(self.instance.secondary_nodes))
2134 def CheckPrereq(self):
2135 """Check prerequisites.
2137 This checks that the instance is in the cluster.
2140 instance = self.cfg.GetInstanceInfo(
2141 self.cfg.ExpandInstanceName(self.op.instance_name))
2142 if instance is None:
2143 raise errors.OpPrereqError("Instance '%s' not known" %
2144 self.op.instance_name)
2146 # check bridges existance
2147 _CheckInstanceBridgesExist(instance)
2149 self.instance = instance
2150 self.op.instance_name = instance.name
2152 def Exec(self, feedback_fn):
2153 """Reboot the instance.
2156 instance = self.instance
2157 ignore_secondaries = self.op.ignore_secondaries
2158 reboot_type = self.op.reboot_type
2159 extra_args = getattr(self.op, "extra_args", "")
2161 node_current = instance.primary_node
2163 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2164 constants.INSTANCE_REBOOT_HARD,
2165 constants.INSTANCE_REBOOT_FULL]:
2166 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2167 (constants.INSTANCE_REBOOT_SOFT,
2168 constants.INSTANCE_REBOOT_HARD,
2169 constants.INSTANCE_REBOOT_FULL))
2171 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2172 constants.INSTANCE_REBOOT_HARD]:
2173 if not rpc.call_instance_reboot(node_current, instance,
2174 reboot_type, extra_args):
2175 raise errors.OpExecError("Could not reboot instance")
2177 if not rpc.call_instance_shutdown(node_current, instance):
2178 raise errors.OpExecError("could not shutdown instance for full reboot")
2179 _ShutdownInstanceDisks(instance, self.cfg)
2180 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2181 if not rpc.call_instance_start(node_current, instance, extra_args):
2182 _ShutdownInstanceDisks(instance, self.cfg)
2183 raise errors.OpExecError("Could not start instance for full reboot")
2185 self.cfg.MarkInstanceUp(instance.name)
2188 class LUShutdownInstance(LogicalUnit):
2189 """Shutdown an instance.
2192 HPATH = "instance-stop"
2193 HTYPE = constants.HTYPE_INSTANCE
2194 _OP_REQP = ["instance_name"]
2196 def BuildHooksEnv(self):
2199 This runs on master, primary and secondary nodes of the instance.
2202 env = _BuildInstanceHookEnvByObject(self.instance)
2203 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2204 list(self.instance.secondary_nodes))
2207 def CheckPrereq(self):
2208 """Check prerequisites.
2210 This checks that the instance is in the cluster.
2213 instance = self.cfg.GetInstanceInfo(
2214 self.cfg.ExpandInstanceName(self.op.instance_name))
2215 if instance is None:
2216 raise errors.OpPrereqError("Instance '%s' not known" %
2217 self.op.instance_name)
2218 self.instance = instance
2220 def Exec(self, feedback_fn):
2221 """Shutdown the instance.
2224 instance = self.instance
2225 node_current = instance.primary_node
2226 self.cfg.MarkInstanceDown(instance.name)
2227 if not rpc.call_instance_shutdown(node_current, instance):
2228 logger.Error("could not shutdown instance")
2230 _ShutdownInstanceDisks(instance, self.cfg)
2233 class LUReinstallInstance(LogicalUnit):
2234 """Reinstall an instance.
2237 HPATH = "instance-reinstall"
2238 HTYPE = constants.HTYPE_INSTANCE
2239 _OP_REQP = ["instance_name"]
2241 def BuildHooksEnv(self):
2244 This runs on master, primary and secondary nodes of the instance.
2247 env = _BuildInstanceHookEnvByObject(self.instance)
2248 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2249 list(self.instance.secondary_nodes))
2252 def CheckPrereq(self):
2253 """Check prerequisites.
2255 This checks that the instance is in the cluster and is not running.
2258 instance = self.cfg.GetInstanceInfo(
2259 self.cfg.ExpandInstanceName(self.op.instance_name))
2260 if instance is None:
2261 raise errors.OpPrereqError("Instance '%s' not known" %
2262 self.op.instance_name)
2263 if instance.disk_template == constants.DT_DISKLESS:
2264 raise errors.OpPrereqError("Instance '%s' has no disks" %
2265 self.op.instance_name)
2266 if instance.status != "down":
2267 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2268 self.op.instance_name)
2269 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2271 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2272 (self.op.instance_name,
2273 instance.primary_node))
2275 self.op.os_type = getattr(self.op, "os_type", None)
2276 if self.op.os_type is not None:
2278 pnode = self.cfg.GetNodeInfo(
2279 self.cfg.ExpandNodeName(instance.primary_node))
2281 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2283 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2285 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2286 " primary node" % self.op.os_type)
2288 self.instance = instance
2290 def Exec(self, feedback_fn):
2291 """Reinstall the instance.
2294 inst = self.instance
2296 if self.op.os_type is not None:
2297 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2298 inst.os = self.op.os_type
2299 self.cfg.AddInstance(inst)
2301 _StartInstanceDisks(self.cfg, inst, None)
2303 feedback_fn("Running the instance OS create scripts...")
2304 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2305 raise errors.OpExecError("Could not install OS for instance %s"
2307 (inst.name, inst.primary_node))
2309 _ShutdownInstanceDisks(inst, self.cfg)
2312 class LURenameInstance(LogicalUnit):
2313 """Rename an instance.
2316 HPATH = "instance-rename"
2317 HTYPE = constants.HTYPE_INSTANCE
2318 _OP_REQP = ["instance_name", "new_name"]
2320 def BuildHooksEnv(self):
2323 This runs on master, primary and secondary nodes of the instance.
2326 env = _BuildInstanceHookEnvByObject(self.instance)
2327 env["INSTANCE_NEW_NAME"] = self.op.new_name
2328 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2329 list(self.instance.secondary_nodes))
2332 def CheckPrereq(self):
2333 """Check prerequisites.
2335 This checks that the instance is in the cluster and is not running.
2338 instance = self.cfg.GetInstanceInfo(
2339 self.cfg.ExpandInstanceName(self.op.instance_name))
2340 if instance is None:
2341 raise errors.OpPrereqError("Instance '%s' not known" %
2342 self.op.instance_name)
2343 if instance.status != "down":
2344 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2345 self.op.instance_name)
2346 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2348 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2349 (self.op.instance_name,
2350 instance.primary_node))
2351 self.instance = instance
2353 # new name verification
2354 name_info = utils.HostInfo(self.op.new_name)
2356 self.op.new_name = new_name = name_info.name
2357 instance_list = self.cfg.GetInstanceList()
2358 if new_name in instance_list:
2359 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2362 if not getattr(self.op, "ignore_ip", False):
2363 command = ["fping", "-q", name_info.ip]
2364 result = utils.RunCmd(command)
2365 if not result.failed:
2366 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2367 (name_info.ip, new_name))
2370 def Exec(self, feedback_fn):
2371 """Reinstall the instance.
2374 inst = self.instance
2375 old_name = inst.name
2377 if inst.disk_template == constants.DT_FILE:
2378 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2380 self.cfg.RenameInstance(inst.name, self.op.new_name)
2382 # re-read the instance from the configuration after rename
2383 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2385 if inst.disk_template == constants.DT_FILE:
2386 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2387 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2388 old_file_storage_dir,
2389 new_file_storage_dir)
2392 raise errors.OpExecError("Could not connect to node '%s' to rename"
2393 " directory '%s' to '%s' (but the instance"
2394 " has been renamed in Ganeti)" % (
2395 inst.primary_node, old_file_storage_dir,
2396 new_file_storage_dir))
2399 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2400 " (but the instance has been renamed in"
2401 " Ganeti)" % (old_file_storage_dir,
2402 new_file_storage_dir))
2404 _StartInstanceDisks(self.cfg, inst, None)
2406 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2408 msg = ("Could run OS rename script for instance %s on node %s (but the"
2409 " instance has been renamed in Ganeti)" %
2410 (inst.name, inst.primary_node))
2413 _ShutdownInstanceDisks(inst, self.cfg)
2416 class LURemoveInstance(LogicalUnit):
2417 """Remove an instance.
2420 HPATH = "instance-remove"
2421 HTYPE = constants.HTYPE_INSTANCE
2422 _OP_REQP = ["instance_name", "ignore_failures"]
2424 def BuildHooksEnv(self):
2427 This runs on master, primary and secondary nodes of the instance.
2430 env = _BuildInstanceHookEnvByObject(self.instance)
2431 nl = [self.sstore.GetMasterNode()]
2434 def CheckPrereq(self):
2435 """Check prerequisites.
2437 This checks that the instance is in the cluster.
2440 instance = self.cfg.GetInstanceInfo(
2441 self.cfg.ExpandInstanceName(self.op.instance_name))
2442 if instance is None:
2443 raise errors.OpPrereqError("Instance '%s' not known" %
2444 self.op.instance_name)
2445 self.instance = instance
2447 def Exec(self, feedback_fn):
2448 """Remove the instance.
2451 instance = self.instance
2452 logger.Info("shutting down instance %s on node %s" %
2453 (instance.name, instance.primary_node))
2455 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2456 if self.op.ignore_failures:
2457 feedback_fn("Warning: can't shutdown instance")
2459 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2460 (instance.name, instance.primary_node))
2462 logger.Info("removing block devices for instance %s" % instance.name)
2464 if not _RemoveDisks(instance, self.cfg):
2465 if self.op.ignore_failures:
2466 feedback_fn("Warning: can't remove instance's disks")
2468 raise errors.OpExecError("Can't remove instance's disks")
2470 logger.Info("removing instance %s out of cluster config" % instance.name)
2472 self.cfg.RemoveInstance(instance.name)
2475 class LUQueryInstances(NoHooksLU):
2476 """Logical unit for querying instances.
2479 _OP_REQP = ["output_fields", "names"]
2481 def CheckPrereq(self):
2482 """Check prerequisites.
2484 This checks that the fields required are valid output fields.
2487 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2488 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2489 "admin_state", "admin_ram",
2490 "disk_template", "ip", "mac", "bridge",
2491 "sda_size", "sdb_size", "vcpus"],
2492 dynamic=self.dynamic_fields,
2493 selected=self.op.output_fields)
2495 self.wanted = _GetWantedInstances(self, self.op.names)
2497 def Exec(self, feedback_fn):
2498 """Computes the list of nodes and their attributes.
2501 instance_names = self.wanted
2502 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2505 # begin data gathering
2507 nodes = frozenset([inst.primary_node for inst in instance_list])
2510 if self.dynamic_fields.intersection(self.op.output_fields):
2512 node_data = rpc.call_all_instances_info(nodes)
2514 result = node_data[name]
2516 live_data.update(result)
2517 elif result == False:
2518 bad_nodes.append(name)
2519 # else no instance is alive
2521 live_data = dict([(name, {}) for name in instance_names])
2523 # end data gathering
2526 for instance in instance_list:
2528 for field in self.op.output_fields:
2533 elif field == "pnode":
2534 val = instance.primary_node
2535 elif field == "snodes":
2536 val = list(instance.secondary_nodes)
2537 elif field == "admin_state":
2538 val = (instance.status != "down")
2539 elif field == "oper_state":
2540 if instance.primary_node in bad_nodes:
2543 val = bool(live_data.get(instance.name))
2544 elif field == "status":
2545 if instance.primary_node in bad_nodes:
2546 val = "ERROR_nodedown"
2548 running = bool(live_data.get(instance.name))
2550 if instance.status != "down":
2555 if instance.status != "down":
2559 elif field == "admin_ram":
2560 val = instance.memory
2561 elif field == "oper_ram":
2562 if instance.primary_node in bad_nodes:
2564 elif instance.name in live_data:
2565 val = live_data[instance.name].get("memory", "?")
2568 elif field == "disk_template":
2569 val = instance.disk_template
2571 val = instance.nics[0].ip
2572 elif field == "bridge":
2573 val = instance.nics[0].bridge
2574 elif field == "mac":
2575 val = instance.nics[0].mac
2576 elif field == "sda_size" or field == "sdb_size":
2577 disk = instance.FindDisk(field[:3])
2582 elif field == "vcpus":
2583 val = instance.vcpus
2585 raise errors.ParameterError(field)
2592 class LUFailoverInstance(LogicalUnit):
2593 """Failover an instance.
2596 HPATH = "instance-failover"
2597 HTYPE = constants.HTYPE_INSTANCE
2598 _OP_REQP = ["instance_name", "ignore_consistency"]
2600 def BuildHooksEnv(self):
2603 This runs on master, primary and secondary nodes of the instance.
2607 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2609 env.update(_BuildInstanceHookEnvByObject(self.instance))
2610 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2613 def CheckPrereq(self):
2614 """Check prerequisites.
2616 This checks that the instance is in the cluster.
2619 instance = self.cfg.GetInstanceInfo(
2620 self.cfg.ExpandInstanceName(self.op.instance_name))
2621 if instance is None:
2622 raise errors.OpPrereqError("Instance '%s' not known" %
2623 self.op.instance_name)
2625 if instance.disk_template not in constants.DTS_NET_MIRROR:
2626 raise errors.OpPrereqError("Instance's disk layout is not"
2627 " network mirrored, cannot failover.")
2629 secondary_nodes = instance.secondary_nodes
2630 if not secondary_nodes:
2631 raise errors.ProgrammerError("no secondary node but using "
2632 "a mirrored disk template")
2634 target_node = secondary_nodes[0]
2635 # check memory requirements on the secondary node
2636 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2637 instance.name, instance.memory)
2639 # check bridge existance
2640 brlist = [nic.bridge for nic in instance.nics]
2641 if not rpc.call_bridges_exist(target_node, brlist):
2642 raise errors.OpPrereqError("One or more target bridges %s does not"
2643 " exist on destination node '%s'" %
2644 (brlist, target_node))
2646 self.instance = instance
2648 def Exec(self, feedback_fn):
2649 """Failover an instance.
2651 The failover is done by shutting it down on its present node and
2652 starting it on the secondary.
2655 instance = self.instance
2657 source_node = instance.primary_node
2658 target_node = instance.secondary_nodes[0]
2660 feedback_fn("* checking disk consistency between source and target")
2661 for dev in instance.disks:
2662 # for drbd, these are drbd over lvm
2663 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2664 if instance.status == "up" and not self.op.ignore_consistency:
2665 raise errors.OpExecError("Disk %s is degraded on target node,"
2666 " aborting failover." % dev.iv_name)
2668 feedback_fn("* shutting down instance on source node")
2669 logger.Info("Shutting down instance %s on node %s" %
2670 (instance.name, source_node))
2672 if not rpc.call_instance_shutdown(source_node, instance):
2673 if self.op.ignore_consistency:
2674 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2675 " anyway. Please make sure node %s is down" %
2676 (instance.name, source_node, source_node))
2678 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2679 (instance.name, source_node))
2681 feedback_fn("* deactivating the instance's disks on source node")
2682 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2683 raise errors.OpExecError("Can't shut down the instance's disks.")
2685 instance.primary_node = target_node
2686 # distribute new instance config to the other nodes
2687 self.cfg.Update(instance)
2689 # Only start the instance if it's marked as up
2690 if instance.status == "up":
2691 feedback_fn("* activating the instance's disks on target node")
2692 logger.Info("Starting instance %s on node %s" %
2693 (instance.name, target_node))
2695 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2696 ignore_secondaries=True)
2698 _ShutdownInstanceDisks(instance, self.cfg)
2699 raise errors.OpExecError("Can't activate the instance's disks")
2701 feedback_fn("* starting the instance on the target node")
2702 if not rpc.call_instance_start(target_node, instance, None):
2703 _ShutdownInstanceDisks(instance, self.cfg)
2704 raise errors.OpExecError("Could not start instance %s on node %s." %
2705 (instance.name, target_node))
2708 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2709 """Create a tree of block devices on the primary node.
2711 This always creates all devices.
2715 for child in device.children:
2716 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2719 cfg.SetDiskID(device, node)
2720 new_id = rpc.call_blockdev_create(node, device, device.size,
2721 instance.name, True, info)
2724 if device.physical_id is None:
2725 device.physical_id = new_id
2729 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2730 """Create a tree of block devices on a secondary node.
2732 If this device type has to be created on secondaries, create it and
2735 If not, just recurse to children keeping the same 'force' value.
2738 if device.CreateOnSecondary():
2741 for child in device.children:
2742 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2743 child, force, info):
2748 cfg.SetDiskID(device, node)
2749 new_id = rpc.call_blockdev_create(node, device, device.size,
2750 instance.name, False, info)
2753 if device.physical_id is None:
2754 device.physical_id = new_id
2758 def _GenerateUniqueNames(cfg, exts):
2759 """Generate a suitable LV name.
2761 This will generate a logical volume name for the given instance.
2766 new_id = cfg.GenerateUniqueID()
2767 results.append("%s%s" % (new_id, val))
2771 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2772 """Generate a drbd device complete with its children.
2775 port = cfg.AllocatePort()
2776 vgname = cfg.GetVGName()
2777 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2778 logical_id=(vgname, names[0]))
2779 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2780 logical_id=(vgname, names[1]))
2781 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2782 logical_id = (primary, secondary, port),
2783 children = [dev_data, dev_meta])
2787 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2788 """Generate a drbd8 device complete with its children.
2791 port = cfg.AllocatePort()
2792 vgname = cfg.GetVGName()
2793 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2794 logical_id=(vgname, names[0]))
2795 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2796 logical_id=(vgname, names[1]))
2797 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2798 logical_id = (primary, secondary, port),
2799 children = [dev_data, dev_meta],
2804 def _GenerateDiskTemplate(cfg, template_name,
2805 instance_name, primary_node,
2806 secondary_nodes, disk_sz, swap_sz,
2807 file_storage_dir, file_driver):
2808 """Generate the entire disk layout for a given template type.
2811 #TODO: compute space requirements
2813 vgname = cfg.GetVGName()
2814 if template_name == constants.DT_DISKLESS:
2816 elif template_name == constants.DT_PLAIN:
2817 if len(secondary_nodes) != 0:
2818 raise errors.ProgrammerError("Wrong template configuration")
2820 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2821 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2822 logical_id=(vgname, names[0]),
2824 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2825 logical_id=(vgname, names[1]),
2827 disks = [sda_dev, sdb_dev]
2828 elif template_name == constants.DT_DRBD8:
2829 if len(secondary_nodes) != 1:
2830 raise errors.ProgrammerError("Wrong template configuration")
2831 remote_node = secondary_nodes[0]
2832 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2833 ".sdb_data", ".sdb_meta"])
2834 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2835 disk_sz, names[0:2], "sda")
2836 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2837 swap_sz, names[2:4], "sdb")
2838 disks = [drbd_sda_dev, drbd_sdb_dev]
2839 elif template_name == constants.DT_FILE:
2840 if len(secondary_nodes) != 0:
2841 raise errors.ProgrammerError("Wrong template configuration")
2843 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2844 iv_name="sda", logical_id=(file_driver,
2845 "%s/sda" % file_storage_dir))
2846 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2847 iv_name="sdb", logical_id=(file_driver,
2848 "%s/sdb" % file_storage_dir))
2849 disks = [file_sda_dev, file_sdb_dev]
2851 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2855 def _GetInstanceInfoText(instance):
2856 """Compute that text that should be added to the disk's metadata.
2859 return "originstname+%s" % instance.name
2862 def _CreateDisks(cfg, instance):
2863 """Create all disks for an instance.
2865 This abstracts away some work from AddInstance.
2868 instance: the instance object
2871 True or False showing the success of the creation process
2874 info = _GetInstanceInfoText(instance)
2876 if instance.disk_template == constants.DT_FILE:
2877 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2878 result = rpc.call_file_storage_dir_create(instance.primary_node,
2882 logger.Error("Could not connect to node '%s'" % instance.primary_node)
2886 logger.Error("failed to create directory '%s'" % file_storage_dir)
2889 for device in instance.disks:
2890 logger.Info("creating volume %s for instance %s" %
2891 (device.iv_name, instance.name))
2893 for secondary_node in instance.secondary_nodes:
2894 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2895 device, False, info):
2896 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2897 (device.iv_name, device, secondary_node))
2900 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2901 instance, device, info):
2902 logger.Error("failed to create volume %s on primary!" %
2909 def _RemoveDisks(instance, cfg):
2910 """Remove all disks for an instance.
2912 This abstracts away some work from `AddInstance()` and
2913 `RemoveInstance()`. Note that in case some of the devices couldn't
2914 be removed, the removal will continue with the other ones (compare
2915 with `_CreateDisks()`).
2918 instance: the instance object
2921 True or False showing the success of the removal proces
2924 logger.Info("removing block devices for instance %s" % instance.name)
2927 for device in instance.disks:
2928 for node, disk in device.ComputeNodeTree(instance.primary_node):
2929 cfg.SetDiskID(disk, node)
2930 if not rpc.call_blockdev_remove(node, disk):
2931 logger.Error("could not remove block device %s on node %s,"
2932 " continuing anyway" %
2933 (device.iv_name, node))
2936 if instance.disk_template == constants.DT_FILE:
2937 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2938 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2940 logger.Error("could not remove directory '%s'" % file_storage_dir)
2946 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2947 """Compute disk size requirements in the volume group
2949 This is currently hard-coded for the two-drive layout.
2952 # Required free disk space as a function of disk and swap space
2954 constants.DT_DISKLESS: None,
2955 constants.DT_PLAIN: disk_size + swap_size,
2956 # 256 MB are added for drbd metadata, 128MB for each drbd device
2957 constants.DT_DRBD8: disk_size + swap_size + 256,
2958 constants.DT_FILE: None,
2961 if disk_template not in req_size_dict:
2962 raise errors.ProgrammerError("Disk template '%s' size requirement"
2963 " is unknown" % disk_template)
2965 return req_size_dict[disk_template]
2968 class LUCreateInstance(LogicalUnit):
2969 """Create an instance.
2972 HPATH = "instance-add"
2973 HTYPE = constants.HTYPE_INSTANCE
2974 _OP_REQP = ["instance_name", "mem_size", "disk_size",
2975 "disk_template", "swap_size", "mode", "start", "vcpus",
2976 "wait_for_sync", "ip_check", "mac"]
2978 def _RunAllocator(self):
2979 """Run the allocator based on input opcode.
2982 disks = [{"size": self.op.disk_size, "mode": "w"},
2983 {"size": self.op.swap_size, "mode": "w"}]
2984 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2985 "bridge": self.op.bridge}]
2986 ial = IAllocator(self.cfg, self.sstore,
2987 mode=constants.IALLOCATOR_MODE_ALLOC,
2988 name=self.op.instance_name,
2989 disk_template=self.op.disk_template,
2992 vcpus=self.op.vcpus,
2993 mem_size=self.op.mem_size,
2998 ial.Run(self.op.iallocator)
3001 raise errors.OpPrereqError("Can't compute nodes using"
3002 " iallocator '%s': %s" % (self.op.iallocator,
3004 if len(ial.nodes) != ial.required_nodes:
3005 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3006 " of nodes (%s), required %s" %
3007 (len(ial.nodes), ial.required_nodes))
3008 self.op.pnode = ial.nodes[0]
3009 logger.ToStdout("Selected nodes for the instance: %s" %
3010 (", ".join(ial.nodes),))
3011 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3012 (self.op.instance_name, self.op.iallocator, ial.nodes))
3013 if ial.required_nodes == 2:
3014 self.op.snode = ial.nodes[1]
3016 def BuildHooksEnv(self):
3019 This runs on master, primary and secondary nodes of the instance.
3023 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3024 "INSTANCE_DISK_SIZE": self.op.disk_size,
3025 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3026 "INSTANCE_ADD_MODE": self.op.mode,
3028 if self.op.mode == constants.INSTANCE_IMPORT:
3029 env["INSTANCE_SRC_NODE"] = self.op.src_node
3030 env["INSTANCE_SRC_PATH"] = self.op.src_path
3031 env["INSTANCE_SRC_IMAGE"] = self.src_image
3033 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3034 primary_node=self.op.pnode,
3035 secondary_nodes=self.secondaries,
3036 status=self.instance_status,
3037 os_type=self.op.os_type,
3038 memory=self.op.mem_size,
3039 vcpus=self.op.vcpus,
3040 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3043 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3048 def CheckPrereq(self):
3049 """Check prerequisites.
3052 # set optional parameters to none if they don't exist
3053 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3054 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3055 "vnc_bind_address"]:
3056 if not hasattr(self.op, attr):
3057 setattr(self.op, attr, None)
3059 if self.op.mode not in (constants.INSTANCE_CREATE,
3060 constants.INSTANCE_IMPORT):
3061 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3064 if (not self.cfg.GetVGName() and
3065 self.op.disk_template not in constants.DTS_NOT_LVM):
3066 raise errors.OpPrereqError("Cluster does not support lvm-based"
3069 if self.op.mode == constants.INSTANCE_IMPORT:
3070 src_node = getattr(self.op, "src_node", None)
3071 src_path = getattr(self.op, "src_path", None)
3072 if src_node is None or src_path is None:
3073 raise errors.OpPrereqError("Importing an instance requires source"
3074 " node and path options")
3075 src_node_full = self.cfg.ExpandNodeName(src_node)
3076 if src_node_full is None:
3077 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3078 self.op.src_node = src_node = src_node_full
3080 if not os.path.isabs(src_path):
3081 raise errors.OpPrereqError("The source path must be absolute")
3083 export_info = rpc.call_export_info(src_node, src_path)
3086 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3088 if not export_info.has_section(constants.INISECT_EXP):
3089 raise errors.ProgrammerError("Corrupted export config")
3091 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3092 if (int(ei_version) != constants.EXPORT_VERSION):
3093 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3094 (ei_version, constants.EXPORT_VERSION))
3096 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3097 raise errors.OpPrereqError("Can't import instance with more than"
3100 # FIXME: are the old os-es, disk sizes, etc. useful?
3101 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3102 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3104 self.src_image = diskimage
3105 else: # INSTANCE_CREATE
3106 if getattr(self.op, "os_type", None) is None:
3107 raise errors.OpPrereqError("No guest OS specified")
3109 #### instance parameters check
3111 # disk template and mirror node verification
3112 if self.op.disk_template not in constants.DISK_TEMPLATES:
3113 raise errors.OpPrereqError("Invalid disk template name")
3115 # instance name verification
3116 hostname1 = utils.HostInfo(self.op.instance_name)
3118 self.op.instance_name = instance_name = hostname1.name
3119 instance_list = self.cfg.GetInstanceList()
3120 if instance_name in instance_list:
3121 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3124 # ip validity checks
3125 ip = getattr(self.op, "ip", None)
3126 if ip is None or ip.lower() == "none":
3128 elif ip.lower() == "auto":
3129 inst_ip = hostname1.ip
3131 if not utils.IsValidIP(ip):
3132 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3133 " like a valid IP" % ip)
3135 self.inst_ip = self.op.ip = inst_ip
3137 if self.op.start and not self.op.ip_check:
3138 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3139 " adding an instance in start mode")
3141 if self.op.ip_check:
3142 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3143 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3144 (hostname1.ip, instance_name))
3146 # MAC address verification
3147 if self.op.mac != "auto":
3148 if not utils.IsValidMac(self.op.mac.lower()):
3149 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3152 # bridge verification
3153 bridge = getattr(self.op, "bridge", None)
3155 self.op.bridge = self.cfg.GetDefBridge()
3157 self.op.bridge = bridge
3159 # boot order verification
3160 if self.op.hvm_boot_order is not None:
3161 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3162 raise errors.OpPrereqError("invalid boot order specified,"
3163 " must be one or more of [acdn]")
3164 # file storage checks
3165 if (self.op.file_driver and
3166 not self.op.file_driver in constants.FILE_DRIVER):
3167 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3168 self.op.file_driver)
3170 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3171 raise errors.OpPrereqError("File storage directory not a relative"
3175 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3176 raise errors.OpPrereqError("One and only one of iallocator and primary"
3177 " node must be given")
3179 if self.op.iallocator is not None:
3180 self._RunAllocator()
3182 #### node related checks
3184 # check primary node
3185 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3187 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3189 self.op.pnode = pnode.name
3191 self.secondaries = []
3193 # mirror node verification
3194 if self.op.disk_template in constants.DTS_NET_MIRROR:
3195 if getattr(self.op, "snode", None) is None:
3196 raise errors.OpPrereqError("The networked disk templates need"
3199 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3200 if snode_name is None:
3201 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3203 elif snode_name == pnode.name:
3204 raise errors.OpPrereqError("The secondary node cannot be"
3205 " the primary node.")
3206 self.secondaries.append(snode_name)
3208 req_size = _ComputeDiskSize(self.op.disk_template,
3209 self.op.disk_size, self.op.swap_size)
3211 # Check lv size requirements
3212 if req_size is not None:
3213 nodenames = [pnode.name] + self.secondaries
3214 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3215 for node in nodenames:
3216 info = nodeinfo.get(node, None)
3218 raise errors.OpPrereqError("Cannot get current information"
3219 " from node '%s'" % node)
3220 vg_free = info.get('vg_free', None)
3221 if not isinstance(vg_free, int):
3222 raise errors.OpPrereqError("Can't compute free disk space on"
3224 if req_size > info['vg_free']:
3225 raise errors.OpPrereqError("Not enough disk space on target node %s."
3226 " %d MB available, %d MB required" %
3227 (node, info['vg_free'], req_size))
3230 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3232 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3233 " primary node" % self.op.os_type)
3235 if self.op.kernel_path == constants.VALUE_NONE:
3236 raise errors.OpPrereqError("Can't set instance kernel to none")
3239 # bridge check on primary node
3240 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3241 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3242 " destination node '%s'" %
3243 (self.op.bridge, pnode.name))
3245 # memory check on primary node
3247 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3248 "creating instance %s" % self.op.instance_name,
3251 # hvm_cdrom_image_path verification
3252 if self.op.hvm_cdrom_image_path is not None:
3253 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3254 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3255 " be an absolute path or None, not %s" %
3256 self.op.hvm_cdrom_image_path)
3257 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3258 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3259 " regular file or a symlink pointing to"
3260 " an existing regular file, not %s" %
3261 self.op.hvm_cdrom_image_path)
3263 # vnc_bind_address verification
3264 if self.op.vnc_bind_address is not None:
3265 if not utils.IsValidIP(self.op.vnc_bind_address):
3266 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3267 " like a valid IP address" %
3268 self.op.vnc_bind_address)
3271 self.instance_status = 'up'
3273 self.instance_status = 'down'
3275 def Exec(self, feedback_fn):
3276 """Create and add the instance to the cluster.
3279 instance = self.op.instance_name
3280 pnode_name = self.pnode.name
3282 if self.op.mac == "auto":
3283 mac_address = self.cfg.GenerateMAC()
3285 mac_address = self.op.mac
3287 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3288 if self.inst_ip is not None:
3289 nic.ip = self.inst_ip
3291 ht_kind = self.sstore.GetHypervisorType()
3292 if ht_kind in constants.HTS_REQ_PORT:
3293 network_port = self.cfg.AllocatePort()
3297 if self.op.vnc_bind_address is None:
3298 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3300 # this is needed because os.path.join does not accept None arguments
3301 if self.op.file_storage_dir is None:
3302 string_file_storage_dir = ""
3304 string_file_storage_dir = self.op.file_storage_dir
3306 # build the full file storage dir path
3307 file_storage_dir = os.path.normpath(os.path.join(
3308 self.sstore.GetFileStorageDir(),
3309 string_file_storage_dir, instance))
3312 disks = _GenerateDiskTemplate(self.cfg,
3313 self.op.disk_template,
3314 instance, pnode_name,
3315 self.secondaries, self.op.disk_size,
3318 self.op.file_driver)
3320 iobj = objects.Instance(name=instance, os=self.op.os_type,
3321 primary_node=pnode_name,
3322 memory=self.op.mem_size,
3323 vcpus=self.op.vcpus,
3324 nics=[nic], disks=disks,
3325 disk_template=self.op.disk_template,
3326 status=self.instance_status,
3327 network_port=network_port,
3328 kernel_path=self.op.kernel_path,
3329 initrd_path=self.op.initrd_path,
3330 hvm_boot_order=self.op.hvm_boot_order,
3331 hvm_acpi=self.op.hvm_acpi,
3332 hvm_pae=self.op.hvm_pae,
3333 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3334 vnc_bind_address=self.op.vnc_bind_address,
3337 feedback_fn("* creating instance disks...")
3338 if not _CreateDisks(self.cfg, iobj):
3339 _RemoveDisks(iobj, self.cfg)
3340 raise errors.OpExecError("Device creation failed, reverting...")
3342 feedback_fn("adding instance %s to cluster config" % instance)
3344 self.cfg.AddInstance(iobj)
3346 if self.op.wait_for_sync:
3347 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3348 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3349 # make sure the disks are not degraded (still sync-ing is ok)
3351 feedback_fn("* checking mirrors status")
3352 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3357 _RemoveDisks(iobj, self.cfg)
3358 self.cfg.RemoveInstance(iobj.name)
3359 raise errors.OpExecError("There are some degraded disks for"
3362 feedback_fn("creating os for instance %s on node %s" %
3363 (instance, pnode_name))
3365 if iobj.disk_template != constants.DT_DISKLESS:
3366 if self.op.mode == constants.INSTANCE_CREATE:
3367 feedback_fn("* running the instance OS create scripts...")
3368 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3369 raise errors.OpExecError("could not add os for instance %s"
3371 (instance, pnode_name))
3373 elif self.op.mode == constants.INSTANCE_IMPORT:
3374 feedback_fn("* running the instance OS import scripts...")
3375 src_node = self.op.src_node
3376 src_image = self.src_image
3377 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3378 src_node, src_image):
3379 raise errors.OpExecError("Could not import os for instance"
3381 (instance, pnode_name))
3383 # also checked in the prereq part
3384 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3388 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3389 feedback_fn("* starting instance...")
3390 if not rpc.call_instance_start(pnode_name, iobj, None):
3391 raise errors.OpExecError("Could not start instance")
3394 class LUConnectConsole(NoHooksLU):
3395 """Connect to an instance's console.
3397 This is somewhat special in that it returns the command line that
3398 you need to run on the master node in order to connect to the
3402 _OP_REQP = ["instance_name"]
3404 def CheckPrereq(self):
3405 """Check prerequisites.
3407 This checks that the instance is in the cluster.
3410 instance = self.cfg.GetInstanceInfo(
3411 self.cfg.ExpandInstanceName(self.op.instance_name))
3412 if instance is None:
3413 raise errors.OpPrereqError("Instance '%s' not known" %
3414 self.op.instance_name)
3415 self.instance = instance
3417 def Exec(self, feedback_fn):
3418 """Connect to the console of an instance
3421 instance = self.instance
3422 node = instance.primary_node
3424 node_insts = rpc.call_instance_list([node])[node]
3425 if node_insts is False:
3426 raise errors.OpExecError("Can't connect to node %s." % node)
3428 if instance.name not in node_insts:
3429 raise errors.OpExecError("Instance %s is not running." % instance.name)
3431 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3433 hyper = hypervisor.GetHypervisor()
3434 console_cmd = hyper.GetShellCommandForConsole(instance)
3437 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3440 class LUReplaceDisks(LogicalUnit):
3441 """Replace the disks of an instance.
3444 HPATH = "mirrors-replace"
3445 HTYPE = constants.HTYPE_INSTANCE
3446 _OP_REQP = ["instance_name", "mode", "disks"]
3448 def _RunAllocator(self):
3449 """Compute a new secondary node using an IAllocator.
3452 ial = IAllocator(self.cfg, self.sstore,
3453 mode=constants.IALLOCATOR_MODE_RELOC,
3454 name=self.op.instance_name,
3455 relocate_from=[self.sec_node])
3457 ial.Run(self.op.iallocator)
3460 raise errors.OpPrereqError("Can't compute nodes using"
3461 " iallocator '%s': %s" % (self.op.iallocator,
3463 if len(ial.nodes) != ial.required_nodes:
3464 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3465 " of nodes (%s), required %s" %
3466 (len(ial.nodes), ial.required_nodes))
3467 self.op.remote_node = ial.nodes[0]
3468 logger.ToStdout("Selected new secondary for the instance: %s" %
3469 self.op.remote_node)
3471 def BuildHooksEnv(self):
3474 This runs on the master, the primary and all the secondaries.
3478 "MODE": self.op.mode,
3479 "NEW_SECONDARY": self.op.remote_node,
3480 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3482 env.update(_BuildInstanceHookEnvByObject(self.instance))
3484 self.sstore.GetMasterNode(),
3485 self.instance.primary_node,
3487 if self.op.remote_node is not None:
3488 nl.append(self.op.remote_node)
3491 def CheckPrereq(self):
3492 """Check prerequisites.
3494 This checks that the instance is in the cluster.
3497 if not hasattr(self.op, "remote_node"):
3498 self.op.remote_node = None
3500 instance = self.cfg.GetInstanceInfo(
3501 self.cfg.ExpandInstanceName(self.op.instance_name))
3502 if instance is None:
3503 raise errors.OpPrereqError("Instance '%s' not known" %
3504 self.op.instance_name)
3505 self.instance = instance
3506 self.op.instance_name = instance.name
3508 if instance.disk_template not in constants.DTS_NET_MIRROR:
3509 raise errors.OpPrereqError("Instance's disk layout is not"
3510 " network mirrored.")
3512 if len(instance.secondary_nodes) != 1:
3513 raise errors.OpPrereqError("The instance has a strange layout,"
3514 " expected one secondary but found %d" %
3515 len(instance.secondary_nodes))
3517 self.sec_node = instance.secondary_nodes[0]
3519 ia_name = getattr(self.op, "iallocator", None)
3520 if ia_name is not None:
3521 if self.op.remote_node is not None:
3522 raise errors.OpPrereqError("Give either the iallocator or the new"
3523 " secondary, not both")
3524 self.op.remote_node = self._RunAllocator()
3526 remote_node = self.op.remote_node
3527 if remote_node is not None:
3528 remote_node = self.cfg.ExpandNodeName(remote_node)
3529 if remote_node is None:
3530 raise errors.OpPrereqError("Node '%s' not known" %
3531 self.op.remote_node)
3532 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3534 self.remote_node_info = None
3535 if remote_node == instance.primary_node:
3536 raise errors.OpPrereqError("The specified node is the primary node of"
3538 elif remote_node == self.sec_node:
3539 if self.op.mode == constants.REPLACE_DISK_SEC:
3540 # this is for DRBD8, where we can't execute the same mode of
3541 # replacement as for drbd7 (no different port allocated)
3542 raise errors.OpPrereqError("Same secondary given, cannot execute"
3544 if instance.disk_template == constants.DT_DRBD8:
3545 if (self.op.mode == constants.REPLACE_DISK_ALL and
3546 remote_node is not None):
3547 # switch to replace secondary mode
3548 self.op.mode = constants.REPLACE_DISK_SEC
3550 if self.op.mode == constants.REPLACE_DISK_ALL:
3551 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3552 " secondary disk replacement, not"
3554 elif self.op.mode == constants.REPLACE_DISK_PRI:
3555 if remote_node is not None:
3556 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3557 " the secondary while doing a primary"
3558 " node disk replacement")
3559 self.tgt_node = instance.primary_node
3560 self.oth_node = instance.secondary_nodes[0]
3561 elif self.op.mode == constants.REPLACE_DISK_SEC:
3562 self.new_node = remote_node # this can be None, in which case
3563 # we don't change the secondary
3564 self.tgt_node = instance.secondary_nodes[0]
3565 self.oth_node = instance.primary_node
3567 raise errors.ProgrammerError("Unhandled disk replace mode")
3569 for name in self.op.disks:
3570 if instance.FindDisk(name) is None:
3571 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3572 (name, instance.name))
3573 self.op.remote_node = remote_node
3575 def _ExecD8DiskOnly(self, feedback_fn):
3576 """Replace a disk on the primary or secondary for dbrd8.
3578 The algorithm for replace is quite complicated:
3579 - for each disk to be replaced:
3580 - create new LVs on the target node with unique names
3581 - detach old LVs from the drbd device
3582 - rename old LVs to name_replaced.<time_t>
3583 - rename new LVs to old LVs
3584 - attach the new LVs (with the old names now) to the drbd device
3585 - wait for sync across all devices
3586 - for each modified disk:
3587 - remove old LVs (which have the name name_replaces.<time_t>)
3589 Failures are not very well handled.
3593 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3594 instance = self.instance
3596 vgname = self.cfg.GetVGName()
3599 tgt_node = self.tgt_node
3600 oth_node = self.oth_node
3602 # Step: check device activation
3603 self.proc.LogStep(1, steps_total, "check device existence")
3604 info("checking volume groups")
3605 my_vg = cfg.GetVGName()
3606 results = rpc.call_vg_list([oth_node, tgt_node])
3608 raise errors.OpExecError("Can't list volume groups on the nodes")
3609 for node in oth_node, tgt_node:
3610 res = results.get(node, False)
3611 if not res or my_vg not in res:
3612 raise errors.OpExecError("Volume group '%s' not found on %s" %
3614 for dev in instance.disks:
3615 if not dev.iv_name in self.op.disks:
3617 for node in tgt_node, oth_node:
3618 info("checking %s on %s" % (dev.iv_name, node))
3619 cfg.SetDiskID(dev, node)
3620 if not rpc.call_blockdev_find(node, dev):
3621 raise errors.OpExecError("Can't find device %s on node %s" %
3622 (dev.iv_name, node))
3624 # Step: check other node consistency
3625 self.proc.LogStep(2, steps_total, "check peer consistency")
3626 for dev in instance.disks:
3627 if not dev.iv_name in self.op.disks:
3629 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3630 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3631 oth_node==instance.primary_node):
3632 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3633 " to replace disks on this node (%s)" %
3634 (oth_node, tgt_node))
3636 # Step: create new storage
3637 self.proc.LogStep(3, steps_total, "allocate new storage")
3638 for dev in instance.disks:
3639 if not dev.iv_name in self.op.disks:
3642 cfg.SetDiskID(dev, tgt_node)
3643 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3644 names = _GenerateUniqueNames(cfg, lv_names)
3645 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3646 logical_id=(vgname, names[0]))
3647 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3648 logical_id=(vgname, names[1]))
3649 new_lvs = [lv_data, lv_meta]
3650 old_lvs = dev.children
3651 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3652 info("creating new local storage on %s for %s" %
3653 (tgt_node, dev.iv_name))
3654 # since we *always* want to create this LV, we use the
3655 # _Create...OnPrimary (which forces the creation), even if we
3656 # are talking about the secondary node
3657 for new_lv in new_lvs:
3658 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3659 _GetInstanceInfoText(instance)):
3660 raise errors.OpExecError("Failed to create new LV named '%s' on"
3662 (new_lv.logical_id[1], tgt_node))
3664 # Step: for each lv, detach+rename*2+attach
3665 self.proc.LogStep(4, steps_total, "change drbd configuration")
3666 for dev, old_lvs, new_lvs in iv_names.itervalues():
3667 info("detaching %s drbd from local storage" % dev.iv_name)
3668 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3669 raise errors.OpExecError("Can't detach drbd from local storage on node"
3670 " %s for device %s" % (tgt_node, dev.iv_name))
3672 #cfg.Update(instance)
3674 # ok, we created the new LVs, so now we know we have the needed
3675 # storage; as such, we proceed on the target node to rename
3676 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3677 # using the assumption that logical_id == physical_id (which in
3678 # turn is the unique_id on that node)
3680 # FIXME(iustin): use a better name for the replaced LVs
3681 temp_suffix = int(time.time())
3682 ren_fn = lambda d, suff: (d.physical_id[0],
3683 d.physical_id[1] + "_replaced-%s" % suff)
3684 # build the rename list based on what LVs exist on the node
3686 for to_ren in old_lvs:
3687 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3688 if find_res is not None: # device exists
3689 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3691 info("renaming the old LVs on the target node")
3692 if not rpc.call_blockdev_rename(tgt_node, rlist):
3693 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3694 # now we rename the new LVs to the old LVs
3695 info("renaming the new LVs on the target node")
3696 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3697 if not rpc.call_blockdev_rename(tgt_node, rlist):
3698 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3700 for old, new in zip(old_lvs, new_lvs):
3701 new.logical_id = old.logical_id
3702 cfg.SetDiskID(new, tgt_node)
3704 for disk in old_lvs:
3705 disk.logical_id = ren_fn(disk, temp_suffix)
3706 cfg.SetDiskID(disk, tgt_node)
3708 # now that the new lvs have the old name, we can add them to the device
3709 info("adding new mirror component on %s" % tgt_node)
3710 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3711 for new_lv in new_lvs:
3712 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3713 warning("Can't rollback device %s", hint="manually cleanup unused"
3715 raise errors.OpExecError("Can't add local storage to drbd")
3717 dev.children = new_lvs
3718 cfg.Update(instance)
3720 # Step: wait for sync
3722 # this can fail as the old devices are degraded and _WaitForSync
3723 # does a combined result over all disks, so we don't check its
3725 self.proc.LogStep(5, steps_total, "sync devices")
3726 _WaitForSync(cfg, instance, self.proc, unlock=True)
3728 # so check manually all the devices
3729 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3730 cfg.SetDiskID(dev, instance.primary_node)
3731 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3733 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3735 # Step: remove old storage
3736 self.proc.LogStep(6, steps_total, "removing old storage")
3737 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3738 info("remove logical volumes for %s" % name)
3740 cfg.SetDiskID(lv, tgt_node)
3741 if not rpc.call_blockdev_remove(tgt_node, lv):
3742 warning("Can't remove old LV", hint="manually remove unused LVs")
3745 def _ExecD8Secondary(self, feedback_fn):
3746 """Replace the secondary node for drbd8.
3748 The algorithm for replace is quite complicated:
3749 - for all disks of the instance:
3750 - create new LVs on the new node with same names
3751 - shutdown the drbd device on the old secondary
3752 - disconnect the drbd network on the primary
3753 - create the drbd device on the new secondary
3754 - network attach the drbd on the primary, using an artifice:
3755 the drbd code for Attach() will connect to the network if it
3756 finds a device which is connected to the good local disks but
3758 - wait for sync across all devices
3759 - remove all disks from the old secondary
3761 Failures are not very well handled.
3765 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3766 instance = self.instance
3768 vgname = self.cfg.GetVGName()
3771 old_node = self.tgt_node
3772 new_node = self.new_node
3773 pri_node = instance.primary_node
3775 # Step: check device activation
3776 self.proc.LogStep(1, steps_total, "check device existence")
3777 info("checking volume groups")
3778 my_vg = cfg.GetVGName()
3779 results = rpc.call_vg_list([pri_node, new_node])
3781 raise errors.OpExecError("Can't list volume groups on the nodes")
3782 for node in pri_node, new_node:
3783 res = results.get(node, False)
3784 if not res or my_vg not in res:
3785 raise errors.OpExecError("Volume group '%s' not found on %s" %
3787 for dev in instance.disks:
3788 if not dev.iv_name in self.op.disks:
3790 info("checking %s on %s" % (dev.iv_name, pri_node))
3791 cfg.SetDiskID(dev, pri_node)
3792 if not rpc.call_blockdev_find(pri_node, dev):
3793 raise errors.OpExecError("Can't find device %s on node %s" %
3794 (dev.iv_name, pri_node))
3796 # Step: check other node consistency
3797 self.proc.LogStep(2, steps_total, "check peer consistency")
3798 for dev in instance.disks:
3799 if not dev.iv_name in self.op.disks:
3801 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3802 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3803 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3804 " unsafe to replace the secondary" %
3807 # Step: create new storage
3808 self.proc.LogStep(3, steps_total, "allocate new storage")
3809 for dev in instance.disks:
3811 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3812 # since we *always* want to create this LV, we use the
3813 # _Create...OnPrimary (which forces the creation), even if we
3814 # are talking about the secondary node
3815 for new_lv in dev.children:
3816 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3817 _GetInstanceInfoText(instance)):
3818 raise errors.OpExecError("Failed to create new LV named '%s' on"
3820 (new_lv.logical_id[1], new_node))
3822 iv_names[dev.iv_name] = (dev, dev.children)
3824 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3825 for dev in instance.disks:
3827 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3828 # create new devices on new_node
3829 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3830 logical_id=(pri_node, new_node,
3832 children=dev.children)
3833 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3835 _GetInstanceInfoText(instance)):
3836 raise errors.OpExecError("Failed to create new DRBD on"
3837 " node '%s'" % new_node)
3839 for dev in instance.disks:
3840 # we have new devices, shutdown the drbd on the old secondary
3841 info("shutting down drbd for %s on old node" % dev.iv_name)
3842 cfg.SetDiskID(dev, old_node)
3843 if not rpc.call_blockdev_shutdown(old_node, dev):
3844 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3845 hint="Please cleanup this device manually as soon as possible")
3847 info("detaching primary drbds from the network (=> standalone)")
3849 for dev in instance.disks:
3850 cfg.SetDiskID(dev, pri_node)
3851 # set the physical (unique in bdev terms) id to None, meaning
3852 # detach from network
3853 dev.physical_id = (None,) * len(dev.physical_id)
3854 # and 'find' the device, which will 'fix' it to match the
3856 if rpc.call_blockdev_find(pri_node, dev):
3859 warning("Failed to detach drbd %s from network, unusual case" %
3863 # no detaches succeeded (very unlikely)
3864 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3866 # if we managed to detach at least one, we update all the disks of
3867 # the instance to point to the new secondary
3868 info("updating instance configuration")
3869 for dev in instance.disks:
3870 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3871 cfg.SetDiskID(dev, pri_node)
3872 cfg.Update(instance)
3874 # and now perform the drbd attach
3875 info("attaching primary drbds to new secondary (standalone => connected)")
3877 for dev in instance.disks:
3878 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3879 # since the attach is smart, it's enough to 'find' the device,
3880 # it will automatically activate the network, if the physical_id
3882 cfg.SetDiskID(dev, pri_node)
3883 if not rpc.call_blockdev_find(pri_node, dev):
3884 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3885 "please do a gnt-instance info to see the status of disks")
3887 # this can fail as the old devices are degraded and _WaitForSync
3888 # does a combined result over all disks, so we don't check its
3890 self.proc.LogStep(5, steps_total, "sync devices")
3891 _WaitForSync(cfg, instance, self.proc, unlock=True)
3893 # so check manually all the devices
3894 for name, (dev, old_lvs) in iv_names.iteritems():
3895 cfg.SetDiskID(dev, pri_node)
3896 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3898 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3900 self.proc.LogStep(6, steps_total, "removing old storage")
3901 for name, (dev, old_lvs) in iv_names.iteritems():
3902 info("remove logical volumes for %s" % name)
3904 cfg.SetDiskID(lv, old_node)
3905 if not rpc.call_blockdev_remove(old_node, lv):
3906 warning("Can't remove LV on old secondary",
3907 hint="Cleanup stale volumes by hand")
3909 def Exec(self, feedback_fn):
3910 """Execute disk replacement.
3912 This dispatches the disk replacement to the appropriate handler.
3915 instance = self.instance
3917 # Activate the instance disks if we're replacing them on a down instance
3918 if instance.status == "down":
3919 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3920 self.proc.ChainOpCode(op)
3922 if instance.disk_template == constants.DT_DRBD8:
3923 if self.op.remote_node is None:
3924 fn = self._ExecD8DiskOnly
3926 fn = self._ExecD8Secondary
3928 raise errors.ProgrammerError("Unhandled disk replacement case")
3930 ret = fn(feedback_fn)
3932 # Deactivate the instance disks if we're replacing them on a down instance
3933 if instance.status == "down":
3934 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3935 self.proc.ChainOpCode(op)
3940 class LUQueryInstanceData(NoHooksLU):
3941 """Query runtime instance data.
3944 _OP_REQP = ["instances"]
3946 def CheckPrereq(self):
3947 """Check prerequisites.
3949 This only checks the optional instance list against the existing names.
3952 if not isinstance(self.op.instances, list):
3953 raise errors.OpPrereqError("Invalid argument type 'instances'")
3954 if self.op.instances:
3955 self.wanted_instances = []
3956 names = self.op.instances
3958 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3959 if instance is None:
3960 raise errors.OpPrereqError("No such instance name '%s'" % name)
3961 self.wanted_instances.append(instance)
3963 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3964 in self.cfg.GetInstanceList()]
3968 def _ComputeDiskStatus(self, instance, snode, dev):
3969 """Compute block device status.
3972 self.cfg.SetDiskID(dev, instance.primary_node)
3973 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3974 if dev.dev_type in constants.LDS_DRBD:
3975 # we change the snode then (otherwise we use the one passed in)
3976 if dev.logical_id[0] == instance.primary_node:
3977 snode = dev.logical_id[1]
3979 snode = dev.logical_id[0]
3982 self.cfg.SetDiskID(dev, snode)
3983 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3988 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3989 for child in dev.children]
3994 "iv_name": dev.iv_name,
3995 "dev_type": dev.dev_type,
3996 "logical_id": dev.logical_id,
3997 "physical_id": dev.physical_id,
3998 "pstatus": dev_pstatus,
3999 "sstatus": dev_sstatus,
4000 "children": dev_children,
4005 def Exec(self, feedback_fn):
4006 """Gather and return data"""
4008 for instance in self.wanted_instances:
4009 remote_info = rpc.call_instance_info(instance.primary_node,
4011 if remote_info and "state" in remote_info:
4014 remote_state = "down"
4015 if instance.status == "down":
4016 config_state = "down"
4020 disks = [self._ComputeDiskStatus(instance, None, device)
4021 for device in instance.disks]
4024 "name": instance.name,
4025 "config_state": config_state,
4026 "run_state": remote_state,
4027 "pnode": instance.primary_node,
4028 "snodes": instance.secondary_nodes,
4030 "memory": instance.memory,
4031 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4033 "vcpus": instance.vcpus,
4036 htkind = self.sstore.GetHypervisorType()
4037 if htkind == constants.HT_XEN_PVM30:
4038 idict["kernel_path"] = instance.kernel_path
4039 idict["initrd_path"] = instance.initrd_path
4041 if htkind == constants.HT_XEN_HVM31:
4042 idict["hvm_boot_order"] = instance.hvm_boot_order
4043 idict["hvm_acpi"] = instance.hvm_acpi
4044 idict["hvm_pae"] = instance.hvm_pae
4045 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4047 if htkind in constants.HTS_REQ_PORT:
4048 idict["vnc_bind_address"] = instance.vnc_bind_address
4049 idict["network_port"] = instance.network_port
4051 result[instance.name] = idict
4056 class LUSetInstanceParams(LogicalUnit):
4057 """Modifies an instances's parameters.
4060 HPATH = "instance-modify"
4061 HTYPE = constants.HTYPE_INSTANCE
4062 _OP_REQP = ["instance_name"]
4064 def BuildHooksEnv(self):
4067 This runs on the master, primary and secondaries.
4072 args['memory'] = self.mem
4074 args['vcpus'] = self.vcpus
4075 if self.do_ip or self.do_bridge or self.mac:
4079 ip = self.instance.nics[0].ip
4081 bridge = self.bridge
4083 bridge = self.instance.nics[0].bridge
4087 mac = self.instance.nics[0].mac
4088 args['nics'] = [(ip, bridge, mac)]
4089 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4090 nl = [self.sstore.GetMasterNode(),
4091 self.instance.primary_node] + list(self.instance.secondary_nodes)
4094 def CheckPrereq(self):
4095 """Check prerequisites.
4097 This only checks the instance list against the existing names.
4100 self.mem = getattr(self.op, "mem", None)
4101 self.vcpus = getattr(self.op, "vcpus", None)
4102 self.ip = getattr(self.op, "ip", None)
4103 self.mac = getattr(self.op, "mac", None)
4104 self.bridge = getattr(self.op, "bridge", None)
4105 self.kernel_path = getattr(self.op, "kernel_path", None)
4106 self.initrd_path = getattr(self.op, "initrd_path", None)
4107 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4108 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4109 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4110 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4111 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4112 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4113 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4114 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4115 self.vnc_bind_address]
4116 if all_parms.count(None) == len(all_parms):
4117 raise errors.OpPrereqError("No changes submitted")
4118 if self.mem is not None:
4120 self.mem = int(self.mem)
4121 except ValueError, err:
4122 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4123 if self.vcpus is not None:
4125 self.vcpus = int(self.vcpus)
4126 except ValueError, err:
4127 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4128 if self.ip is not None:
4130 if self.ip.lower() == "none":
4133 if not utils.IsValidIP(self.ip):
4134 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4137 self.do_bridge = (self.bridge is not None)
4138 if self.mac is not None:
4139 if self.cfg.IsMacInUse(self.mac):
4140 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4142 if not utils.IsValidMac(self.mac):
4143 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4145 if self.kernel_path is not None:
4146 self.do_kernel_path = True
4147 if self.kernel_path == constants.VALUE_NONE:
4148 raise errors.OpPrereqError("Can't set instance to no kernel")
4150 if self.kernel_path != constants.VALUE_DEFAULT:
4151 if not os.path.isabs(self.kernel_path):
4152 raise errors.OpPrereqError("The kernel path must be an absolute"
4155 self.do_kernel_path = False
4157 if self.initrd_path is not None:
4158 self.do_initrd_path = True
4159 if self.initrd_path not in (constants.VALUE_NONE,
4160 constants.VALUE_DEFAULT):
4161 if not os.path.isabs(self.initrd_path):
4162 raise errors.OpPrereqError("The initrd path must be an absolute"
4165 self.do_initrd_path = False
4167 # boot order verification
4168 if self.hvm_boot_order is not None:
4169 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4170 if len(self.hvm_boot_order.strip("acdn")) != 0:
4171 raise errors.OpPrereqError("invalid boot order specified,"
4172 " must be one or more of [acdn]"
4175 # hvm_cdrom_image_path verification
4176 if self.op.hvm_cdrom_image_path is not None:
4177 if not os.path.isabs(self.op.hvm_cdrom_image_path):
4178 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4179 " be an absolute path or None, not %s" %
4180 self.op.hvm_cdrom_image_path)
4181 if not os.path.isfile(self.op.hvm_cdrom_image_path):
4182 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4183 " regular file or a symlink pointing to"
4184 " an existing regular file, not %s" %
4185 self.op.hvm_cdrom_image_path)
4187 # vnc_bind_address verification
4188 if self.op.vnc_bind_address is not None:
4189 if not utils.IsValidIP(self.op.vnc_bind_address):
4190 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4191 " like a valid IP address" %
4192 self.op.vnc_bind_address)
4194 instance = self.cfg.GetInstanceInfo(
4195 self.cfg.ExpandInstanceName(self.op.instance_name))
4196 if instance is None:
4197 raise errors.OpPrereqError("No such instance name '%s'" %
4198 self.op.instance_name)
4199 self.op.instance_name = instance.name
4200 self.instance = instance
4203 def Exec(self, feedback_fn):
4204 """Modifies an instance.
4206 All parameters take effect only at the next restart of the instance.
4209 instance = self.instance
4211 instance.memory = self.mem
4212 result.append(("mem", self.mem))
4214 instance.vcpus = self.vcpus
4215 result.append(("vcpus", self.vcpus))
4217 instance.nics[0].ip = self.ip
4218 result.append(("ip", self.ip))
4220 instance.nics[0].bridge = self.bridge
4221 result.append(("bridge", self.bridge))
4223 instance.nics[0].mac = self.mac
4224 result.append(("mac", self.mac))
4225 if self.do_kernel_path:
4226 instance.kernel_path = self.kernel_path
4227 result.append(("kernel_path", self.kernel_path))
4228 if self.do_initrd_path:
4229 instance.initrd_path = self.initrd_path
4230 result.append(("initrd_path", self.initrd_path))
4231 if self.hvm_boot_order:
4232 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4233 instance.hvm_boot_order = None
4235 instance.hvm_boot_order = self.hvm_boot_order
4236 result.append(("hvm_boot_order", self.hvm_boot_order))
4238 instance.hvm_acpi = self.hvm_acpi
4239 result.append(("hvm_acpi", self.hvm_acpi))
4241 instance.hvm_pae = self.hvm_pae
4242 result.append(("hvm_pae", self.hvm_pae))
4243 if self.hvm_cdrom_image_path:
4244 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4245 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4246 if self.vnc_bind_address:
4247 instance.vnc_bind_address = self.vnc_bind_address
4248 result.append(("vnc_bind_address", self.vnc_bind_address))
4250 self.cfg.AddInstance(instance)
4255 class LUQueryExports(NoHooksLU):
4256 """Query the exports list
4261 def CheckPrereq(self):
4262 """Check that the nodelist contains only existing nodes.
4265 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4267 def Exec(self, feedback_fn):
4268 """Compute the list of all the exported system images.
4271 a dictionary with the structure node->(export-list)
4272 where export-list is a list of the instances exported on
4276 return rpc.call_export_list(self.nodes)
4279 class LUExportInstance(LogicalUnit):
4280 """Export an instance to an image in the cluster.
4283 HPATH = "instance-export"
4284 HTYPE = constants.HTYPE_INSTANCE
4285 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4287 def BuildHooksEnv(self):
4290 This will run on the master, primary node and target node.
4294 "EXPORT_NODE": self.op.target_node,
4295 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4297 env.update(_BuildInstanceHookEnvByObject(self.instance))
4298 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4299 self.op.target_node]
4302 def CheckPrereq(self):
4303 """Check prerequisites.
4305 This checks that the instance and node names are valid.
4308 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4309 self.instance = self.cfg.GetInstanceInfo(instance_name)
4310 if self.instance is None:
4311 raise errors.OpPrereqError("Instance '%s' not found" %
4312 self.op.instance_name)
4315 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4316 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4318 if self.dst_node is None:
4319 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4320 self.op.target_node)
4321 self.op.target_node = self.dst_node.name
4323 # instance disk type verification
4324 for disk in self.instance.disks:
4325 if disk.dev_type == constants.LD_FILE:
4326 raise errors.OpPrereqError("Export not supported for instances with"
4327 " file-based disks")
4329 def Exec(self, feedback_fn):
4330 """Export an instance to an image in the cluster.
4333 instance = self.instance
4334 dst_node = self.dst_node
4335 src_node = instance.primary_node
4336 if self.op.shutdown:
4337 # shutdown the instance, but not the disks
4338 if not rpc.call_instance_shutdown(src_node, instance):
4339 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4340 (instance.name, src_node))
4342 vgname = self.cfg.GetVGName()
4347 for disk in instance.disks:
4348 if disk.iv_name == "sda":
4349 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4350 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4352 if not new_dev_name:
4353 logger.Error("could not snapshot block device %s on node %s" %
4354 (disk.logical_id[1], src_node))
4356 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4357 logical_id=(vgname, new_dev_name),
4358 physical_id=(vgname, new_dev_name),
4359 iv_name=disk.iv_name)
4360 snap_disks.append(new_dev)
4363 if self.op.shutdown and instance.status == "up":
4364 if not rpc.call_instance_start(src_node, instance, None):
4365 _ShutdownInstanceDisks(instance, self.cfg)
4366 raise errors.OpExecError("Could not start instance")
4368 # TODO: check for size
4370 for dev in snap_disks:
4371 if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4372 logger.Error("could not export block device %s from node %s to node %s"
4373 % (dev.logical_id[1], src_node, dst_node.name))
4374 if not rpc.call_blockdev_remove(src_node, dev):
4375 logger.Error("could not remove snapshot block device %s from node %s" %
4376 (dev.logical_id[1], src_node))
4378 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4379 logger.Error("could not finalize export for instance %s on node %s" %
4380 (instance.name, dst_node.name))
4382 nodelist = self.cfg.GetNodeList()
4383 nodelist.remove(dst_node.name)
4385 # on one-node clusters nodelist will be empty after the removal
4386 # if we proceed the backup would be removed because OpQueryExports
4387 # substitutes an empty list with the full cluster node list.
4389 op = opcodes.OpQueryExports(nodes=nodelist)
4390 exportlist = self.proc.ChainOpCode(op)
4391 for node in exportlist:
4392 if instance.name in exportlist[node]:
4393 if not rpc.call_export_remove(node, instance.name):
4394 logger.Error("could not remove older export for instance %s"
4395 " on node %s" % (instance.name, node))
4398 class LURemoveExport(NoHooksLU):
4399 """Remove exports related to the named instance.
4402 _OP_REQP = ["instance_name"]
4404 def CheckPrereq(self):
4405 """Check prerequisites.
4409 def Exec(self, feedback_fn):
4410 """Remove any export.
4413 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4414 # If the instance was not found we'll try with the name that was passed in.
4415 # This will only work if it was an FQDN, though.
4417 if not instance_name:
4419 instance_name = self.op.instance_name
4421 op = opcodes.OpQueryExports(nodes=[])
4422 exportlist = self.proc.ChainOpCode(op)
4424 for node in exportlist:
4425 if instance_name in exportlist[node]:
4427 if not rpc.call_export_remove(node, instance_name):
4428 logger.Error("could not remove export for instance %s"
4429 " on node %s" % (instance_name, node))
4431 if fqdn_warn and not found:
4432 feedback_fn("Export not found. If trying to remove an export belonging"
4433 " to a deleted instance please use its Fully Qualified"
4437 class TagsLU(NoHooksLU):
4440 This is an abstract class which is the parent of all the other tags LUs.
4443 def CheckPrereq(self):
4444 """Check prerequisites.
4447 if self.op.kind == constants.TAG_CLUSTER:
4448 self.target = self.cfg.GetClusterInfo()
4449 elif self.op.kind == constants.TAG_NODE:
4450 name = self.cfg.ExpandNodeName(self.op.name)
4452 raise errors.OpPrereqError("Invalid node name (%s)" %
4455 self.target = self.cfg.GetNodeInfo(name)
4456 elif self.op.kind == constants.TAG_INSTANCE:
4457 name = self.cfg.ExpandInstanceName(self.op.name)
4459 raise errors.OpPrereqError("Invalid instance name (%s)" %
4462 self.target = self.cfg.GetInstanceInfo(name)
4464 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4468 class LUGetTags(TagsLU):
4469 """Returns the tags of a given object.
4472 _OP_REQP = ["kind", "name"]
4474 def Exec(self, feedback_fn):
4475 """Returns the tag list.
4478 return self.target.GetTags()
4481 class LUSearchTags(NoHooksLU):
4482 """Searches the tags for a given pattern.
4485 _OP_REQP = ["pattern"]
4487 def CheckPrereq(self):
4488 """Check prerequisites.
4490 This checks the pattern passed for validity by compiling it.
4494 self.re = re.compile(self.op.pattern)
4495 except re.error, err:
4496 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4497 (self.op.pattern, err))
4499 def Exec(self, feedback_fn):
4500 """Returns the tag list.
4504 tgts = [("/cluster", cfg.GetClusterInfo())]
4505 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4506 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4507 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4508 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4510 for path, target in tgts:
4511 for tag in target.GetTags():
4512 if self.re.search(tag):
4513 results.append((path, tag))
4517 class LUAddTags(TagsLU):
4518 """Sets a tag on a given object.
4521 _OP_REQP = ["kind", "name", "tags"]
4523 def CheckPrereq(self):
4524 """Check prerequisites.
4526 This checks the type and length of the tag name and value.
4529 TagsLU.CheckPrereq(self)
4530 for tag in self.op.tags:
4531 objects.TaggableObject.ValidateTag(tag)
4533 def Exec(self, feedback_fn):
4538 for tag in self.op.tags:
4539 self.target.AddTag(tag)
4540 except errors.TagError, err:
4541 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4543 self.cfg.Update(self.target)
4544 except errors.ConfigurationError:
4545 raise errors.OpRetryError("There has been a modification to the"
4546 " config file and the operation has been"
4547 " aborted. Please retry.")
4550 class LUDelTags(TagsLU):
4551 """Delete a list of tags from a given object.
4554 _OP_REQP = ["kind", "name", "tags"]
4556 def CheckPrereq(self):
4557 """Check prerequisites.
4559 This checks that we have the given tag.
4562 TagsLU.CheckPrereq(self)
4563 for tag in self.op.tags:
4564 objects.TaggableObject.ValidateTag(tag)
4565 del_tags = frozenset(self.op.tags)
4566 cur_tags = self.target.GetTags()
4567 if not del_tags <= cur_tags:
4568 diff_tags = del_tags - cur_tags
4569 diff_names = ["'%s'" % tag for tag in diff_tags]
4571 raise errors.OpPrereqError("Tag(s) %s not found" %
4572 (",".join(diff_names)))
4574 def Exec(self, feedback_fn):
4575 """Remove the tag from the object.
4578 for tag in self.op.tags:
4579 self.target.RemoveTag(tag)
4581 self.cfg.Update(self.target)
4582 except errors.ConfigurationError:
4583 raise errors.OpRetryError("There has been a modification to the"
4584 " config file and the operation has been"
4585 " aborted. Please retry.")
4587 class LUTestDelay(NoHooksLU):
4588 """Sleep for a specified amount of time.
4590 This LU sleeps on the master and/or nodes for a specified amoutn of
4594 _OP_REQP = ["duration", "on_master", "on_nodes"]
4596 def CheckPrereq(self):
4597 """Check prerequisites.
4599 This checks that we have a good list of nodes and/or the duration
4604 if self.op.on_nodes:
4605 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4607 def Exec(self, feedback_fn):
4608 """Do the actual sleep.
4611 if self.op.on_master:
4612 if not utils.TestDelay(self.op.duration):
4613 raise errors.OpExecError("Error during master delay test")
4614 if self.op.on_nodes:
4615 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4617 raise errors.OpExecError("Complete failure from rpc call")
4618 for node, node_result in result.items():
4620 raise errors.OpExecError("Failure during rpc call to node %s,"
4621 " result: %s" % (node, node_result))
4624 class IAllocator(object):
4625 """IAllocator framework.
4627 An IAllocator instance has three sets of attributes:
4628 - cfg/sstore that are needed to query the cluster
4629 - input data (all members of the _KEYS class attribute are required)
4630 - four buffer attributes (in|out_data|text), that represent the
4631 input (to the external script) in text and data structure format,
4632 and the output from it, again in two formats
4633 - the result variables from the script (success, info, nodes) for
4638 "mem_size", "disks", "disk_template",
4639 "os", "tags", "nics", "vcpus",
4645 def __init__(self, cfg, sstore, mode, name, **kwargs):
4647 self.sstore = sstore
4648 # init buffer variables
4649 self.in_text = self.out_text = self.in_data = self.out_data = None
4650 # init all input fields so that pylint is happy
4653 self.mem_size = self.disks = self.disk_template = None
4654 self.os = self.tags = self.nics = self.vcpus = None
4655 self.relocate_from = None
4657 self.required_nodes = None
4658 # init result fields
4659 self.success = self.info = self.nodes = None
4660 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4661 keyset = self._ALLO_KEYS
4662 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4663 keyset = self._RELO_KEYS
4665 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4666 " IAllocator" % self.mode)
4668 if key not in keyset:
4669 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4670 " IAllocator" % key)
4671 setattr(self, key, kwargs[key])
4673 if key not in kwargs:
4674 raise errors.ProgrammerError("Missing input parameter '%s' to"
4675 " IAllocator" % key)
4676 self._BuildInputData()
4678 def _ComputeClusterData(self):
4679 """Compute the generic allocator input data.
4681 This is the data that is independent of the actual operation.
4688 "cluster_name": self.sstore.GetClusterName(),
4689 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4690 "hypervisor_type": self.sstore.GetHypervisorType(),
4691 # we don't have job IDs
4694 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4698 node_list = cfg.GetNodeList()
4699 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4700 for nname in node_list:
4701 ninfo = cfg.GetNodeInfo(nname)
4702 if nname not in node_data or not isinstance(node_data[nname], dict):
4703 raise errors.OpExecError("Can't get data for node %s" % nname)
4704 remote_info = node_data[nname]
4705 for attr in ['memory_total', 'memory_free', 'memory_dom0',
4706 'vg_size', 'vg_free', 'cpu_total']:
4707 if attr not in remote_info:
4708 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4711 remote_info[attr] = int(remote_info[attr])
4712 except ValueError, err:
4713 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4714 " %s" % (nname, attr, str(err)))
4715 # compute memory used by primary instances
4716 i_p_mem = i_p_up_mem = 0
4717 for iinfo in i_list:
4718 if iinfo.primary_node == nname:
4719 i_p_mem += iinfo.memory
4720 if iinfo.status == "up":
4721 i_p_up_mem += iinfo.memory
4723 # compute memory used by instances
4725 "tags": list(ninfo.GetTags()),
4726 "total_memory": remote_info['memory_total'],
4727 "reserved_memory": remote_info['memory_dom0'],
4728 "free_memory": remote_info['memory_free'],
4729 "i_pri_memory": i_p_mem,
4730 "i_pri_up_memory": i_p_up_mem,
4731 "total_disk": remote_info['vg_size'],
4732 "free_disk": remote_info['vg_free'],
4733 "primary_ip": ninfo.primary_ip,
4734 "secondary_ip": ninfo.secondary_ip,
4735 "total_cpus": remote_info['cpu_total'],
4737 node_results[nname] = pnr
4738 data["nodes"] = node_results
4742 for iinfo in i_list:
4743 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4744 for n in iinfo.nics]
4746 "tags": list(iinfo.GetTags()),
4747 "should_run": iinfo.status == "up",
4748 "vcpus": iinfo.vcpus,
4749 "memory": iinfo.memory,
4751 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4753 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4754 "disk_template": iinfo.disk_template,
4756 instance_data[iinfo.name] = pir
4758 data["instances"] = instance_data
4762 def _AddNewInstance(self):
4763 """Add new instance data to allocator structure.
4765 This in combination with _AllocatorGetClusterData will create the
4766 correct structure needed as input for the allocator.
4768 The checks for the completeness of the opcode must have already been
4773 if len(self.disks) != 2:
4774 raise errors.OpExecError("Only two-disk configurations supported")
4776 disk_space = _ComputeDiskSize(self.disk_template,
4777 self.disks[0]["size"], self.disks[1]["size"])
4779 if self.disk_template in constants.DTS_NET_MIRROR:
4780 self.required_nodes = 2
4782 self.required_nodes = 1
4786 "disk_template": self.disk_template,
4789 "vcpus": self.vcpus,
4790 "memory": self.mem_size,
4791 "disks": self.disks,
4792 "disk_space_total": disk_space,
4794 "required_nodes": self.required_nodes,
4796 data["request"] = request
4798 def _AddRelocateInstance(self):
4799 """Add relocate instance data to allocator structure.
4801 This in combination with _IAllocatorGetClusterData will create the
4802 correct structure needed as input for the allocator.
4804 The checks for the completeness of the opcode must have already been
4808 instance = self.cfg.GetInstanceInfo(self.name)
4809 if instance is None:
4810 raise errors.ProgrammerError("Unknown instance '%s' passed to"
4811 " IAllocator" % self.name)
4813 if instance.disk_template not in constants.DTS_NET_MIRROR:
4814 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4816 if len(instance.secondary_nodes) != 1:
4817 raise errors.OpPrereqError("Instance has not exactly one secondary node")
4819 self.required_nodes = 1
4821 disk_space = _ComputeDiskSize(instance.disk_template,
4822 instance.disks[0].size,
4823 instance.disks[1].size)
4828 "disk_space_total": disk_space,
4829 "required_nodes": self.required_nodes,
4830 "relocate_from": self.relocate_from,
4832 self.in_data["request"] = request
4834 def _BuildInputData(self):
4835 """Build input data structures.
4838 self._ComputeClusterData()
4840 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4841 self._AddNewInstance()
4843 self._AddRelocateInstance()
4845 self.in_text = serializer.Dump(self.in_data)
4847 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4848 """Run an instance allocator and return the results.
4853 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4855 if not isinstance(result, tuple) or len(result) != 4:
4856 raise errors.OpExecError("Invalid result from master iallocator runner")
4858 rcode, stdout, stderr, fail = result
4860 if rcode == constants.IARUN_NOTFOUND:
4861 raise errors.OpExecError("Can't find allocator '%s'" % name)
4862 elif rcode == constants.IARUN_FAILURE:
4863 raise errors.OpExecError("Instance allocator call failed: %s,"
4865 (fail, stdout+stderr))
4866 self.out_text = stdout
4868 self._ValidateResult()
4870 def _ValidateResult(self):
4871 """Process the allocator results.
4873 This will process and if successful save the result in
4874 self.out_data and the other parameters.
4878 rdict = serializer.Load(self.out_text)
4879 except Exception, err:
4880 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4882 if not isinstance(rdict, dict):
4883 raise errors.OpExecError("Can't parse iallocator results: not a dict")
4885 for key in "success", "info", "nodes":
4886 if key not in rdict:
4887 raise errors.OpExecError("Can't parse iallocator results:"
4888 " missing key '%s'" % key)
4889 setattr(self, key, rdict[key])
4891 if not isinstance(rdict["nodes"], list):
4892 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4894 self.out_data = rdict
4897 class LUTestAllocator(NoHooksLU):
4898 """Run allocator tests.
4900 This LU runs the allocator tests
4903 _OP_REQP = ["direction", "mode", "name"]
4905 def CheckPrereq(self):
4906 """Check prerequisites.
4908 This checks the opcode parameters depending on the director and mode test.
4911 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4912 for attr in ["name", "mem_size", "disks", "disk_template",
4913 "os", "tags", "nics", "vcpus"]:
4914 if not hasattr(self.op, attr):
4915 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4917 iname = self.cfg.ExpandInstanceName(self.op.name)
4918 if iname is not None:
4919 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4921 if not isinstance(self.op.nics, list):
4922 raise errors.OpPrereqError("Invalid parameter 'nics'")
4923 for row in self.op.nics:
4924 if (not isinstance(row, dict) or
4927 "bridge" not in row):
4928 raise errors.OpPrereqError("Invalid contents of the"
4929 " 'nics' parameter")
4930 if not isinstance(self.op.disks, list):
4931 raise errors.OpPrereqError("Invalid parameter 'disks'")
4932 if len(self.op.disks) != 2:
4933 raise errors.OpPrereqError("Only two-disk configurations supported")
4934 for row in self.op.disks:
4935 if (not isinstance(row, dict) or
4936 "size" not in row or
4937 not isinstance(row["size"], int) or
4938 "mode" not in row or
4939 row["mode"] not in ['r', 'w']):
4940 raise errors.OpPrereqError("Invalid contents of the"
4941 " 'disks' parameter")
4942 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4943 if not hasattr(self.op, "name"):
4944 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4945 fname = self.cfg.ExpandInstanceName(self.op.name)
4947 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4949 self.op.name = fname
4950 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4952 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4955 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4956 if not hasattr(self.op, "allocator") or self.op.allocator is None:
4957 raise errors.OpPrereqError("Missing allocator name")
4958 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4959 raise errors.OpPrereqError("Wrong allocator test '%s'" %
4962 def Exec(self, feedback_fn):
4963 """Run the allocator test.
4966 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4967 ial = IAllocator(self.cfg, self.sstore,
4970 mem_size=self.op.mem_size,
4971 disks=self.op.disks,
4972 disk_template=self.op.disk_template,
4976 vcpus=self.op.vcpus,
4979 ial = IAllocator(self.cfg, self.sstore,
4982 relocate_from=list(self.relocate_from),
4985 if self.op.direction == constants.IALLOCATOR_DIR_IN:
4986 result = ial.in_text
4988 ial.Run(self.op.allocator, validate=False)
4989 result = ial.out_text