4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import config
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import ssconf
45 from ganeti import serializer
48 class LogicalUnit(object):
49 """Logical Unit base class.
51 Subclasses must follow these rules:
52 - implement CheckPrereq which also fills in the opcode instance
53 with all the fields (even if as None)
55 - implement BuildHooksEnv
56 - redefine HPATH and HTYPE
57 - optionally redefine their run requirements (REQ_MASTER); note that all
58 commands require root permissions
66 def __init__(self, processor, op, cfg, sstore):
67 """Constructor for LogicalUnit.
69 This needs to be overriden in derived classes in order to check op
79 for attr_name in self._OP_REQP:
80 attr_val = getattr(op, attr_name, None)
82 raise errors.OpPrereqError("Required parameter '%s' missing" %
85 if not cfg.IsCluster():
86 raise errors.OpPrereqError("Cluster not initialized yet,"
87 " use 'gnt-cluster init' first.")
89 master = sstore.GetMasterNode()
90 if master != utils.HostInfo().name:
91 raise errors.OpPrereqError("Commands must be run on the master"
95 """Returns the SshRunner object
99 self.__ssh = ssh.SshRunner(self.sstore)
102 ssh = property(fget=__GetSSH)
104 def CheckPrereq(self):
105 """Check prerequisites for this LU.
107 This method should check that the prerequisites for the execution
108 of this LU are fulfilled. It can do internode communication, but
109 it should be idempotent - no cluster or system changes are
112 The method should raise errors.OpPrereqError in case something is
113 not fulfilled. Its return value is ignored.
115 This method should also update all the parameters of the opcode to
116 their canonical form; e.g. a short node name must be fully
117 expanded after this method has successfully completed (so that
118 hooks, logging, etc. work correctly).
121 raise NotImplementedError
123 def Exec(self, feedback_fn):
126 This method should implement the actual work. It should raise
127 errors.OpExecError for failures that are somewhat dealt with in
131 raise NotImplementedError
133 def BuildHooksEnv(self):
134 """Build hooks environment for this LU.
136 This method should return a three-node tuple consisting of: a dict
137 containing the environment that will be used for running the
138 specific hook for this LU, a list of node names on which the hook
139 should run before the execution, and a list of node names on which
140 the hook should run after the execution.
142 The keys of the dict must not have 'GANETI_' prefixed as this will
143 be handled in the hooks runner. Also note additional keys will be
144 added by the hooks runner. If the LU doesn't define any
145 environment, an empty dict (and not None) should be returned.
147 No nodes should be returned as an empty list (and not None).
149 Note that if the HPATH for a LU class is None, this function will
153 raise NotImplementedError
155 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
156 """Notify the LU about the results of its hooks.
158 This method is called every time a hooks phase is executed, and notifies
159 the Logical Unit about the hooks' result. The LU can then use it to alter
160 its result based on the hooks. By default the method does nothing and the
161 previous result is passed back unchanged but any LU can define it if it
162 wants to use the local cluster hook-scripts somehow.
165 phase: the hooks phase that has just been run
166 hooks_results: the results of the multi-node hooks rpc call
167 feedback_fn: function to send feedback back to the caller
168 lu_result: the previous result this LU had, or None in the PRE phase.
174 class NoHooksLU(LogicalUnit):
175 """Simple LU which runs no hooks.
177 This LU is intended as a parent for other LogicalUnits which will
178 run no hooks, in order to reduce duplicate code.
185 def _GetWantedNodes(lu, nodes):
186 """Returns list of checked and expanded node names.
189 nodes: List of nodes (strings) or None for all
192 if not isinstance(nodes, list):
193 raise errors.OpPrereqError("Invalid argument type 'nodes'")
199 node = lu.cfg.ExpandNodeName(name)
201 raise errors.OpPrereqError("No such node name '%s'" % name)
205 wanted = lu.cfg.GetNodeList()
206 return utils.NiceSort(wanted)
209 def _GetWantedInstances(lu, instances):
210 """Returns list of checked and expanded instance names.
213 instances: List of instances (strings) or None for all
216 if not isinstance(instances, list):
217 raise errors.OpPrereqError("Invalid argument type 'instances'")
222 for name in instances:
223 instance = lu.cfg.ExpandInstanceName(name)
225 raise errors.OpPrereqError("No such instance name '%s'" % name)
226 wanted.append(instance)
229 wanted = lu.cfg.GetInstanceList()
230 return utils.NiceSort(wanted)
233 def _CheckOutputFields(static, dynamic, selected):
234 """Checks whether all selected fields are valid.
237 static: Static fields
238 dynamic: Dynamic fields
241 static_fields = frozenset(static)
242 dynamic_fields = frozenset(dynamic)
244 all_fields = static_fields | dynamic_fields
246 if not all_fields.issuperset(selected):
247 raise errors.OpPrereqError("Unknown output fields selected: %s"
248 % ",".join(frozenset(selected).
249 difference(all_fields)))
252 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
253 memory, vcpus, nics):
254 """Builds instance related env variables for hooks from single variables.
257 secondary_nodes: List of secondary nodes as strings
261 "INSTANCE_NAME": name,
262 "INSTANCE_PRIMARY": primary_node,
263 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
264 "INSTANCE_OS_TYPE": os_type,
265 "INSTANCE_STATUS": status,
266 "INSTANCE_MEMORY": memory,
267 "INSTANCE_VCPUS": vcpus,
271 nic_count = len(nics)
272 for idx, (ip, bridge, mac) in enumerate(nics):
275 env["INSTANCE_NIC%d_IP" % idx] = ip
276 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
277 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
281 env["INSTANCE_NIC_COUNT"] = nic_count
286 def _BuildInstanceHookEnvByObject(instance, override=None):
287 """Builds instance related env variables for hooks from an object.
290 instance: objects.Instance object of instance
291 override: dict of values to override
294 'name': instance.name,
295 'primary_node': instance.primary_node,
296 'secondary_nodes': instance.secondary_nodes,
297 'os_type': instance.os,
298 'status': instance.os,
299 'memory': instance.memory,
300 'vcpus': instance.vcpus,
301 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
304 args.update(override)
305 return _BuildInstanceHookEnv(**args)
308 def _CheckInstanceBridgesExist(instance):
309 """Check that the brigdes needed by an instance exist.
312 # check bridges existance
313 brlist = [nic.bridge for nic in instance.nics]
314 if not rpc.call_bridges_exist(instance.primary_node, brlist):
315 raise errors.OpPrereqError("one or more target bridges %s does not"
316 " exist on destination node '%s'" %
317 (brlist, instance.primary_node))
320 class LUDestroyCluster(NoHooksLU):
321 """Logical unit for destroying the cluster.
326 def CheckPrereq(self):
327 """Check prerequisites.
329 This checks whether the cluster is empty.
331 Any errors are signalled by raising errors.OpPrereqError.
334 master = self.sstore.GetMasterNode()
336 nodelist = self.cfg.GetNodeList()
337 if len(nodelist) != 1 or nodelist[0] != master:
338 raise errors.OpPrereqError("There are still %d node(s) in"
339 " this cluster." % (len(nodelist) - 1))
340 instancelist = self.cfg.GetInstanceList()
342 raise errors.OpPrereqError("There are still %d instance(s) in"
343 " this cluster." % len(instancelist))
345 def Exec(self, feedback_fn):
346 """Destroys the cluster.
349 master = self.sstore.GetMasterNode()
350 if not rpc.call_node_stop_master(master):
351 raise errors.OpExecError("Could not disable the master role")
352 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
353 utils.CreateBackup(priv_key)
354 utils.CreateBackup(pub_key)
355 rpc.call_node_leave_cluster(master)
358 class LUVerifyCluster(LogicalUnit):
359 """Verifies the cluster status.
362 HPATH = "cluster-verify"
363 HTYPE = constants.HTYPE_CLUSTER
364 _OP_REQP = ["skip_checks"]
366 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
367 remote_version, feedback_fn):
368 """Run multiple tests against a node.
371 - compares ganeti version
372 - checks vg existance and size > 20G
373 - checks config file checksum
374 - checks ssh to other nodes
377 node: name of the node to check
378 file_list: required list of files
379 local_cksum: dictionary of local files and their checksums
382 # compares ganeti version
383 local_version = constants.PROTOCOL_VERSION
384 if not remote_version:
385 feedback_fn(" - ERROR: connection to %s failed" % (node))
388 if local_version != remote_version:
389 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
390 (local_version, node, remote_version))
393 # checks vg existance and size > 20G
397 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
401 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
402 constants.MIN_VG_SIZE)
404 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
407 # checks config file checksum
410 if 'filelist' not in node_result:
412 feedback_fn(" - ERROR: node hasn't returned file checksum data")
414 remote_cksum = node_result['filelist']
415 for file_name in file_list:
416 if file_name not in remote_cksum:
418 feedback_fn(" - ERROR: file '%s' missing" % file_name)
419 elif remote_cksum[file_name] != local_cksum[file_name]:
421 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
423 if 'nodelist' not in node_result:
425 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
427 if node_result['nodelist']:
429 for node in node_result['nodelist']:
430 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
431 (node, node_result['nodelist'][node]))
432 if 'node-net-test' not in node_result:
434 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
436 if node_result['node-net-test']:
438 nlist = utils.NiceSort(node_result['node-net-test'].keys())
440 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
441 (node, node_result['node-net-test'][node]))
443 hyp_result = node_result.get('hypervisor', None)
444 if hyp_result is not None:
445 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
448 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
449 node_instance, feedback_fn):
450 """Verify an instance.
452 This function checks to see if the required block devices are
453 available on the instance's node.
458 node_current = instanceconfig.primary_node
461 instanceconfig.MapLVsByNode(node_vol_should)
463 for node in node_vol_should:
464 for volume in node_vol_should[node]:
465 if node not in node_vol_is or volume not in node_vol_is[node]:
466 feedback_fn(" - ERROR: volume %s missing on node %s" %
470 if not instanceconfig.status == 'down':
471 if (node_current not in node_instance or
472 not instance in node_instance[node_current]):
473 feedback_fn(" - ERROR: instance %s not running on node %s" %
474 (instance, node_current))
477 for node in node_instance:
478 if (not node == node_current):
479 if instance in node_instance[node]:
480 feedback_fn(" - ERROR: instance %s should not run on node %s" %
486 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
487 """Verify if there are any unknown volumes in the cluster.
489 The .os, .swap and backup volumes are ignored. All other volumes are
495 for node in node_vol_is:
496 for volume in node_vol_is[node]:
497 if node not in node_vol_should or volume not in node_vol_should[node]:
498 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
503 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
504 """Verify the list of running instances.
506 This checks what instances are running but unknown to the cluster.
510 for node in node_instance:
511 for runninginstance in node_instance[node]:
512 if runninginstance not in instancelist:
513 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
514 (runninginstance, node))
518 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
519 """Verify N+1 Memory Resilience.
521 Check that if one single node dies we can still start all the instances it
527 for node, nodeinfo in node_info.iteritems():
528 # This code checks that every node which is now listed as secondary has
529 # enough memory to host all instances it is supposed to should a single
530 # other node in the cluster fail.
531 # FIXME: not ready for failover to an arbitrary node
532 # FIXME: does not support file-backed instances
533 # WARNING: we currently take into account down instances as well as up
534 # ones, considering that even if they're down someone might want to start
535 # them even in the event of a node failure.
536 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
538 for instance in instances:
539 needed_mem += instance_cfg[instance].memory
540 if nodeinfo['mfree'] < needed_mem:
541 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
542 " failovers should node %s fail" % (node, prinode))
546 def CheckPrereq(self):
547 """Check prerequisites.
549 Transform the list of checks we're going to skip into a set and check that
550 all its members are valid.
553 self.skip_set = frozenset(self.op.skip_checks)
554 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
555 raise errors.OpPrereqError("Invalid checks to be skipped specified")
557 def BuildHooksEnv(self):
560 Cluster-Verify hooks just rone in the post phase and their failure makes
561 the output be logged in the verify output and the verification to fail.
564 all_nodes = self.cfg.GetNodeList()
565 # TODO: populate the environment with useful information for verify hooks
567 return env, [], all_nodes
569 def Exec(self, feedback_fn):
570 """Verify integrity of cluster, performing various test on nodes.
574 feedback_fn("* Verifying global settings")
575 for msg in self.cfg.VerifyConfig():
576 feedback_fn(" - ERROR: %s" % msg)
578 vg_name = self.cfg.GetVGName()
579 nodelist = utils.NiceSort(self.cfg.GetNodeList())
580 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
581 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
582 i_non_redundant = [] # Non redundant instances
588 # FIXME: verify OS list
590 file_names = list(self.sstore.GetFileList())
591 file_names.append(constants.SSL_CERT_FILE)
592 file_names.append(constants.CLUSTER_CONF_FILE)
593 local_checksums = utils.FingerprintFiles(file_names)
595 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
596 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
597 all_instanceinfo = rpc.call_instance_list(nodelist)
598 all_vglist = rpc.call_vg_list(nodelist)
599 node_verify_param = {
600 'filelist': file_names,
601 'nodelist': nodelist,
603 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
604 for node in nodeinfo]
606 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
607 all_rversion = rpc.call_version(nodelist)
608 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
610 for node in nodelist:
611 feedback_fn("* Verifying node %s" % node)
612 result = self._VerifyNode(node, file_names, local_checksums,
613 all_vglist[node], all_nvinfo[node],
614 all_rversion[node], feedback_fn)
618 volumeinfo = all_volumeinfo[node]
620 if isinstance(volumeinfo, basestring):
621 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
622 (node, volumeinfo[-400:].encode('string_escape')))
624 node_volume[node] = {}
625 elif not isinstance(volumeinfo, dict):
626 feedback_fn(" - ERROR: connection to %s failed" % (node,))
630 node_volume[node] = volumeinfo
633 nodeinstance = all_instanceinfo[node]
634 if type(nodeinstance) != list:
635 feedback_fn(" - ERROR: connection to %s failed" % (node,))
639 node_instance[node] = nodeinstance
642 nodeinfo = all_ninfo[node]
643 if not isinstance(nodeinfo, dict):
644 feedback_fn(" - ERROR: connection to %s failed" % (node,))
650 "mfree": int(nodeinfo['memory_free']),
651 "dfree": int(nodeinfo['vg_free']),
654 # dictionary holding all instances this node is secondary for,
655 # grouped by their primary node. Each key is a cluster node, and each
656 # value is a list of instances which have the key as primary and the
657 # current node as secondary. this is handy to calculate N+1 memory
658 # availability if you can only failover from a primary to its
660 "sinst-by-pnode": {},
663 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
669 for instance in instancelist:
670 feedback_fn("* Verifying instance %s" % instance)
671 inst_config = self.cfg.GetInstanceInfo(instance)
672 result = self._VerifyInstance(instance, inst_config, node_volume,
673 node_instance, feedback_fn)
676 inst_config.MapLVsByNode(node_vol_should)
678 instance_cfg[instance] = inst_config
680 pnode = inst_config.primary_node
681 if pnode in node_info:
682 node_info[pnode]['pinst'].append(instance)
684 feedback_fn(" - ERROR: instance %s, connection to primary node"
685 " %s failed" % (instance, pnode))
688 # If the instance is non-redundant we cannot survive losing its primary
689 # node, so we are not N+1 compliant. On the other hand we have no disk
690 # templates with more than one secondary so that situation is not well
692 # FIXME: does not support file-backed instances
693 if len(inst_config.secondary_nodes) == 0:
694 i_non_redundant.append(instance)
695 elif len(inst_config.secondary_nodes) > 1:
696 feedback_fn(" - WARNING: multiple secondaries for instance %s"
699 for snode in inst_config.secondary_nodes:
700 if snode in node_info:
701 node_info[snode]['sinst'].append(instance)
702 if pnode not in node_info[snode]['sinst-by-pnode']:
703 node_info[snode]['sinst-by-pnode'][pnode] = []
704 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
706 feedback_fn(" - ERROR: instance %s, connection to secondary node"
707 " %s failed" % (instance, snode))
709 feedback_fn("* Verifying orphan volumes")
710 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
714 feedback_fn("* Verifying remaining instances")
715 result = self._VerifyOrphanInstances(instancelist, node_instance,
719 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
720 feedback_fn("* Verifying N+1 Memory redundancy")
721 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
724 feedback_fn("* Other Notes")
726 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
727 % len(i_non_redundant))
731 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
732 """Analize the post-hooks' result, handle it, and send some
733 nicely-formatted feedback back to the user.
736 phase: the hooks phase that has just been run
737 hooks_results: the results of the multi-node hooks rpc call
738 feedback_fn: function to send feedback back to the caller
739 lu_result: previous Exec result
742 # We only really run POST phase hooks, and are only interested in their results
743 if phase == constants.HOOKS_PHASE_POST:
744 # Used to change hooks' output to proper indentation
745 indent_re = re.compile('^', re.M)
746 feedback_fn("* Hooks Results")
747 if not hooks_results:
748 feedback_fn(" - ERROR: general communication failure")
751 for node_name in hooks_results:
752 show_node_header = True
753 res = hooks_results[node_name]
754 if res is False or not isinstance(res, list):
755 feedback_fn(" Communication failure")
758 for script, hkr, output in res:
759 if hkr == constants.HKR_FAIL:
760 # The node header is only shown once, if there are
761 # failing hooks on that node
763 feedback_fn(" Node %s:" % node_name)
764 show_node_header = False
765 feedback_fn(" ERROR: Script %s failed, output:" % script)
766 output = indent_re.sub(' ', output)
767 feedback_fn("%s" % output)
773 class LUVerifyDisks(NoHooksLU):
774 """Verifies the cluster disks status.
779 def CheckPrereq(self):
780 """Check prerequisites.
782 This has no prerequisites.
787 def Exec(self, feedback_fn):
788 """Verify integrity of cluster disks.
791 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
793 vg_name = self.cfg.GetVGName()
794 nodes = utils.NiceSort(self.cfg.GetNodeList())
795 instances = [self.cfg.GetInstanceInfo(name)
796 for name in self.cfg.GetInstanceList()]
799 for inst in instances:
801 if (inst.status != "up" or
802 inst.disk_template not in constants.DTS_NET_MIRROR):
804 inst.MapLVsByNode(inst_lvs)
805 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
806 for node, vol_list in inst_lvs.iteritems():
808 nv_dict[(node, vol)] = inst
813 node_lvs = rpc.call_volume_list(nodes, vg_name)
820 if isinstance(lvs, basestring):
821 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
823 elif not isinstance(lvs, dict):
824 logger.Info("connection to node %s failed or invalid data returned" %
826 res_nodes.append(node)
829 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
830 inst = nv_dict.pop((node, lv_name), None)
831 if (not lv_online and inst is not None
832 and inst.name not in res_instances):
833 res_instances.append(inst.name)
835 # any leftover items in nv_dict are missing LVs, let's arrange the
837 for key, inst in nv_dict.iteritems():
838 if inst.name not in res_missing:
839 res_missing[inst.name] = []
840 res_missing[inst.name].append(key)
845 class LURenameCluster(LogicalUnit):
846 """Rename the cluster.
849 HPATH = "cluster-rename"
850 HTYPE = constants.HTYPE_CLUSTER
853 def BuildHooksEnv(self):
858 "OP_TARGET": self.sstore.GetClusterName(),
859 "NEW_NAME": self.op.name,
861 mn = self.sstore.GetMasterNode()
862 return env, [mn], [mn]
864 def CheckPrereq(self):
865 """Verify that the passed name is a valid one.
868 hostname = utils.HostInfo(self.op.name)
870 new_name = hostname.name
871 self.ip = new_ip = hostname.ip
872 old_name = self.sstore.GetClusterName()
873 old_ip = self.sstore.GetMasterIP()
874 if new_name == old_name and new_ip == old_ip:
875 raise errors.OpPrereqError("Neither the name nor the IP address of the"
876 " cluster has changed")
878 result = utils.RunCmd(["fping", "-q", new_ip])
879 if not result.failed:
880 raise errors.OpPrereqError("The given cluster IP address (%s) is"
881 " reachable on the network. Aborting." %
884 self.op.name = new_name
886 def Exec(self, feedback_fn):
887 """Rename the cluster.
890 clustername = self.op.name
894 # shutdown the master IP
895 master = ss.GetMasterNode()
896 if not rpc.call_node_stop_master(master):
897 raise errors.OpExecError("Could not disable the master role")
901 ss.SetKey(ss.SS_MASTER_IP, ip)
902 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
904 # Distribute updated ss config to all nodes
905 myself = self.cfg.GetNodeInfo(master)
906 dist_nodes = self.cfg.GetNodeList()
907 if myself.name in dist_nodes:
908 dist_nodes.remove(myself.name)
910 logger.Debug("Copying updated ssconf data to all nodes")
911 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
912 fname = ss.KeyToFilename(keyname)
913 result = rpc.call_upload_file(dist_nodes, fname)
914 for to_node in dist_nodes:
915 if not result[to_node]:
916 logger.Error("copy of file %s to node %s failed" %
919 if not rpc.call_node_start_master(master):
920 logger.Error("Could not re-enable the master role on the master,"
921 " please restart manually.")
924 def _RecursiveCheckIfLVMBased(disk):
925 """Check if the given disk or its children are lvm-based.
928 disk: ganeti.objects.Disk object
931 boolean indicating whether a LD_LV dev_type was found or not
935 for chdisk in disk.children:
936 if _RecursiveCheckIfLVMBased(chdisk):
938 return disk.dev_type == constants.LD_LV
941 class LUSetClusterParams(LogicalUnit):
942 """Change the parameters of the cluster.
945 HPATH = "cluster-modify"
946 HTYPE = constants.HTYPE_CLUSTER
949 def BuildHooksEnv(self):
954 "OP_TARGET": self.sstore.GetClusterName(),
955 "NEW_VG_NAME": self.op.vg_name,
957 mn = self.sstore.GetMasterNode()
958 return env, [mn], [mn]
960 def CheckPrereq(self):
961 """Check prerequisites.
963 This checks whether the given params don't conflict and
964 if the given volume group is valid.
967 if not self.op.vg_name:
968 instances = [self.cfg.GetInstanceInfo(name)
969 for name in self.cfg.GetInstanceList()]
970 for inst in instances:
971 for disk in inst.disks:
972 if _RecursiveCheckIfLVMBased(disk):
973 raise errors.OpPrereqError("Cannot disable lvm storage while"
974 " lvm-based instances exist")
976 # if vg_name not None, checks given volume group on all nodes
978 node_list = self.cfg.GetNodeList()
979 vglist = rpc.call_vg_list(node_list)
980 for node in node_list:
981 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
982 constants.MIN_VG_SIZE)
984 raise errors.OpPrereqError("Error on node '%s': %s" %
987 def Exec(self, feedback_fn):
988 """Change the parameters of the cluster.
991 if self.op.vg_name != self.cfg.GetVGName():
992 self.cfg.SetVGName(self.op.vg_name)
994 feedback_fn("Cluster LVM configuration already in desired"
995 " state, not changing")
998 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
999 """Sleep and poll for an instance's disk to sync.
1002 if not instance.disks:
1006 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1008 node = instance.primary_node
1010 for dev in instance.disks:
1011 cfgw.SetDiskID(dev, node)
1017 cumul_degraded = False
1018 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1020 proc.LogWarning("Can't get any data from node %s" % node)
1023 raise errors.RemoteError("Can't contact node %s for mirror data,"
1024 " aborting." % node)
1028 for i in range(len(rstats)):
1031 proc.LogWarning("Can't compute data for node %s/%s" %
1032 (node, instance.disks[i].iv_name))
1034 # we ignore the ldisk parameter
1035 perc_done, est_time, is_degraded, _ = mstat
1036 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1037 if perc_done is not None:
1039 if est_time is not None:
1040 rem_time = "%d estimated seconds remaining" % est_time
1043 rem_time = "no time estimate"
1044 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1045 (instance.disks[i].iv_name, perc_done, rem_time))
1050 #utils.Unlock('cmd')
1053 time.sleep(min(60, max_time))
1060 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1061 return not cumul_degraded
1064 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1065 """Check that mirrors are not degraded.
1067 The ldisk parameter, if True, will change the test from the
1068 is_degraded attribute (which represents overall non-ok status for
1069 the device(s)) to the ldisk (representing the local storage status).
1072 cfgw.SetDiskID(dev, node)
1079 if on_primary or dev.AssembleOnSecondary():
1080 rstats = rpc.call_blockdev_find(node, dev)
1082 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1085 result = result and (not rstats[idx])
1087 for child in dev.children:
1088 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1093 class LUDiagnoseOS(NoHooksLU):
1094 """Logical unit for OS diagnose/query.
1097 _OP_REQP = ["output_fields", "names"]
1099 def CheckPrereq(self):
1100 """Check prerequisites.
1102 This always succeeds, since this is a pure query LU.
1106 raise errors.OpPrereqError("Selective OS query not supported")
1108 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1109 _CheckOutputFields(static=[],
1110 dynamic=self.dynamic_fields,
1111 selected=self.op.output_fields)
1114 def _DiagnoseByOS(node_list, rlist):
1115 """Remaps a per-node return list into an a per-os per-node dictionary
1118 node_list: a list with the names of all nodes
1119 rlist: a map with node names as keys and OS objects as values
1122 map: a map with osnames as keys and as value another map, with
1124 keys and list of OS objects as values
1125 e.g. {"debian-etch": {"node1": [<object>,...],
1126 "node2": [<object>,]}
1131 for node_name, nr in rlist.iteritems():
1135 if os_obj.name not in all_os:
1136 # build a list of nodes for this os containing empty lists
1137 # for each node in node_list
1138 all_os[os_obj.name] = {}
1139 for nname in node_list:
1140 all_os[os_obj.name][nname] = []
1141 all_os[os_obj.name][node_name].append(os_obj)
1144 def Exec(self, feedback_fn):
1145 """Compute the list of OSes.
1148 node_list = self.cfg.GetNodeList()
1149 node_data = rpc.call_os_diagnose(node_list)
1150 if node_data == False:
1151 raise errors.OpExecError("Can't gather the list of OSes")
1152 pol = self._DiagnoseByOS(node_list, node_data)
1154 for os_name, os_data in pol.iteritems():
1156 for field in self.op.output_fields:
1159 elif field == "valid":
1160 val = utils.all([osl and osl[0] for osl in os_data.values()])
1161 elif field == "node_status":
1163 for node_name, nos_list in os_data.iteritems():
1164 val[node_name] = [(v.status, v.path) for v in nos_list]
1166 raise errors.ParameterError(field)
1173 class LURemoveNode(LogicalUnit):
1174 """Logical unit for removing a node.
1177 HPATH = "node-remove"
1178 HTYPE = constants.HTYPE_NODE
1179 _OP_REQP = ["node_name"]
1181 def BuildHooksEnv(self):
1184 This doesn't run on the target node in the pre phase as a failed
1185 node would not allows itself to run.
1189 "OP_TARGET": self.op.node_name,
1190 "NODE_NAME": self.op.node_name,
1192 all_nodes = self.cfg.GetNodeList()
1193 all_nodes.remove(self.op.node_name)
1194 return env, all_nodes, all_nodes
1196 def CheckPrereq(self):
1197 """Check prerequisites.
1200 - the node exists in the configuration
1201 - it does not have primary or secondary instances
1202 - it's not the master
1204 Any errors are signalled by raising errors.OpPrereqError.
1207 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1209 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1211 instance_list = self.cfg.GetInstanceList()
1213 masternode = self.sstore.GetMasterNode()
1214 if node.name == masternode:
1215 raise errors.OpPrereqError("Node is the master node,"
1216 " you need to failover first.")
1218 for instance_name in instance_list:
1219 instance = self.cfg.GetInstanceInfo(instance_name)
1220 if node.name == instance.primary_node:
1221 raise errors.OpPrereqError("Instance %s still running on the node,"
1222 " please remove first." % instance_name)
1223 if node.name in instance.secondary_nodes:
1224 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1225 " please remove first." % instance_name)
1226 self.op.node_name = node.name
1229 def Exec(self, feedback_fn):
1230 """Removes the node from the cluster.
1234 logger.Info("stopping the node daemon and removing configs from node %s" %
1237 rpc.call_node_leave_cluster(node.name)
1239 self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
1241 logger.Info("Removing node %s from config" % node.name)
1243 self.cfg.RemoveNode(node.name)
1245 utils.RemoveHostFromEtcHosts(node.name)
1248 class LUQueryNodes(NoHooksLU):
1249 """Logical unit for querying nodes.
1252 _OP_REQP = ["output_fields", "names"]
1254 def CheckPrereq(self):
1255 """Check prerequisites.
1257 This checks that the fields required are valid output fields.
1260 self.dynamic_fields = frozenset([
1262 "mtotal", "mnode", "mfree",
1267 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1268 "pinst_list", "sinst_list",
1270 dynamic=self.dynamic_fields,
1271 selected=self.op.output_fields)
1273 self.wanted = _GetWantedNodes(self, self.op.names)
1275 def Exec(self, feedback_fn):
1276 """Computes the list of nodes and their attributes.
1279 nodenames = self.wanted
1280 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1282 # begin data gathering
1284 if self.dynamic_fields.intersection(self.op.output_fields):
1286 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1287 for name in nodenames:
1288 nodeinfo = node_data.get(name, None)
1291 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1292 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1293 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1294 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1295 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1296 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1297 "bootid": nodeinfo['bootid'],
1300 live_data[name] = {}
1302 live_data = dict.fromkeys(nodenames, {})
1304 node_to_primary = dict([(name, set()) for name in nodenames])
1305 node_to_secondary = dict([(name, set()) for name in nodenames])
1307 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1308 "sinst_cnt", "sinst_list"))
1309 if inst_fields & frozenset(self.op.output_fields):
1310 instancelist = self.cfg.GetInstanceList()
1312 for instance_name in instancelist:
1313 inst = self.cfg.GetInstanceInfo(instance_name)
1314 if inst.primary_node in node_to_primary:
1315 node_to_primary[inst.primary_node].add(inst.name)
1316 for secnode in inst.secondary_nodes:
1317 if secnode in node_to_secondary:
1318 node_to_secondary[secnode].add(inst.name)
1320 # end data gathering
1323 for node in nodelist:
1325 for field in self.op.output_fields:
1328 elif field == "pinst_list":
1329 val = list(node_to_primary[node.name])
1330 elif field == "sinst_list":
1331 val = list(node_to_secondary[node.name])
1332 elif field == "pinst_cnt":
1333 val = len(node_to_primary[node.name])
1334 elif field == "sinst_cnt":
1335 val = len(node_to_secondary[node.name])
1336 elif field == "pip":
1337 val = node.primary_ip
1338 elif field == "sip":
1339 val = node.secondary_ip
1340 elif field in self.dynamic_fields:
1341 val = live_data[node.name].get(field, None)
1343 raise errors.ParameterError(field)
1344 node_output.append(val)
1345 output.append(node_output)
1350 class LUQueryNodeVolumes(NoHooksLU):
1351 """Logical unit for getting volumes on node(s).
1354 _OP_REQP = ["nodes", "output_fields"]
1356 def CheckPrereq(self):
1357 """Check prerequisites.
1359 This checks that the fields required are valid output fields.
1362 self.nodes = _GetWantedNodes(self, self.op.nodes)
1364 _CheckOutputFields(static=["node"],
1365 dynamic=["phys", "vg", "name", "size", "instance"],
1366 selected=self.op.output_fields)
1369 def Exec(self, feedback_fn):
1370 """Computes the list of nodes and their attributes.
1373 nodenames = self.nodes
1374 volumes = rpc.call_node_volumes(nodenames)
1376 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1377 in self.cfg.GetInstanceList()]
1379 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1382 for node in nodenames:
1383 if node not in volumes or not volumes[node]:
1386 node_vols = volumes[node][:]
1387 node_vols.sort(key=lambda vol: vol['dev'])
1389 for vol in node_vols:
1391 for field in self.op.output_fields:
1394 elif field == "phys":
1398 elif field == "name":
1400 elif field == "size":
1401 val = int(float(vol['size']))
1402 elif field == "instance":
1404 if node not in lv_by_node[inst]:
1406 if vol['name'] in lv_by_node[inst][node]:
1412 raise errors.ParameterError(field)
1413 node_output.append(str(val))
1415 output.append(node_output)
1420 class LUAddNode(LogicalUnit):
1421 """Logical unit for adding node to the cluster.
1425 HTYPE = constants.HTYPE_NODE
1426 _OP_REQP = ["node_name"]
1428 def BuildHooksEnv(self):
1431 This will run on all nodes before, and on all nodes + the new node after.
1435 "OP_TARGET": self.op.node_name,
1436 "NODE_NAME": self.op.node_name,
1437 "NODE_PIP": self.op.primary_ip,
1438 "NODE_SIP": self.op.secondary_ip,
1440 nodes_0 = self.cfg.GetNodeList()
1441 nodes_1 = nodes_0 + [self.op.node_name, ]
1442 return env, nodes_0, nodes_1
1444 def CheckPrereq(self):
1445 """Check prerequisites.
1448 - the new node is not already in the config
1450 - its parameters (single/dual homed) matches the cluster
1452 Any errors are signalled by raising errors.OpPrereqError.
1455 node_name = self.op.node_name
1458 dns_data = utils.HostInfo(node_name)
1460 node = dns_data.name
1461 primary_ip = self.op.primary_ip = dns_data.ip
1462 secondary_ip = getattr(self.op, "secondary_ip", None)
1463 if secondary_ip is None:
1464 secondary_ip = primary_ip
1465 if not utils.IsValidIP(secondary_ip):
1466 raise errors.OpPrereqError("Invalid secondary IP given")
1467 self.op.secondary_ip = secondary_ip
1469 node_list = cfg.GetNodeList()
1470 if not self.op.readd and node in node_list:
1471 raise errors.OpPrereqError("Node %s is already in the configuration" %
1473 elif self.op.readd and node not in node_list:
1474 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1476 for existing_node_name in node_list:
1477 existing_node = cfg.GetNodeInfo(existing_node_name)
1479 if self.op.readd and node == existing_node_name:
1480 if (existing_node.primary_ip != primary_ip or
1481 existing_node.secondary_ip != secondary_ip):
1482 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1483 " address configuration as before")
1486 if (existing_node.primary_ip == primary_ip or
1487 existing_node.secondary_ip == primary_ip or
1488 existing_node.primary_ip == secondary_ip or
1489 existing_node.secondary_ip == secondary_ip):
1490 raise errors.OpPrereqError("New node ip address(es) conflict with"
1491 " existing node %s" % existing_node.name)
1493 # check that the type of the node (single versus dual homed) is the
1494 # same as for the master
1495 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1496 master_singlehomed = myself.secondary_ip == myself.primary_ip
1497 newbie_singlehomed = secondary_ip == primary_ip
1498 if master_singlehomed != newbie_singlehomed:
1499 if master_singlehomed:
1500 raise errors.OpPrereqError("The master has no private ip but the"
1501 " new node has one")
1503 raise errors.OpPrereqError("The master has a private ip but the"
1504 " new node doesn't have one")
1506 # checks reachablity
1507 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1508 raise errors.OpPrereqError("Node not reachable by ping")
1510 if not newbie_singlehomed:
1511 # check reachability from my secondary ip to newbie's secondary ip
1512 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1513 source=myself.secondary_ip):
1514 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1515 " based ping to noded port")
1517 self.new_node = objects.Node(name=node,
1518 primary_ip=primary_ip,
1519 secondary_ip=secondary_ip)
1521 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1522 if not os.path.exists(constants.VNC_PASSWORD_FILE):
1523 raise errors.OpPrereqError("Cluster VNC password file %s missing" %
1524 constants.VNC_PASSWORD_FILE)
1526 def Exec(self, feedback_fn):
1527 """Adds the new node to the cluster.
1530 new_node = self.new_node
1531 node = new_node.name
1533 # set up inter-node password and certificate and restarts the node daemon
1534 gntpass = self.sstore.GetNodeDaemonPassword()
1535 if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
1536 raise errors.OpExecError("ganeti password corruption detected")
1537 f = open(constants.SSL_CERT_FILE)
1539 gntpem = f.read(8192)
1542 # in the base64 pem encoding, neither '!' nor '.' are valid chars,
1543 # so we use this to detect an invalid certificate; as long as the
1544 # cert doesn't contain this, the here-document will be correctly
1545 # parsed by the shell sequence below
1546 if re.search('^!EOF\.', gntpem, re.MULTILINE):
1547 raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
1548 if not gntpem.endswith("\n"):
1549 raise errors.OpExecError("PEM must end with newline")
1550 logger.Info("copy cluster pass to %s and starting the node daemon" % node)
1552 # and then connect with ssh to set password and start ganeti-noded
1553 # note that all the below variables are sanitized at this point,
1554 # either by being constants or by the checks above
1556 mycommand = ("umask 077 && "
1557 "echo '%s' > '%s' && "
1558 "cat > '%s' << '!EOF.' && \n"
1559 "%s!EOF.\n%s restart" %
1560 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
1561 constants.SSL_CERT_FILE, gntpem,
1562 constants.NODE_INITD_SCRIPT))
1564 result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
1566 raise errors.OpExecError("Remote command on node %s, error: %s,"
1568 (node, result.fail_reason, result.output))
1570 # check connectivity
1573 result = rpc.call_version([node])[node]
1575 if constants.PROTOCOL_VERSION == result:
1576 logger.Info("communication to node %s fine, sw version %s match" %
1579 raise errors.OpExecError("Version mismatch master version %s,"
1580 " node version %s" %
1581 (constants.PROTOCOL_VERSION, result))
1583 raise errors.OpExecError("Cannot get version from the new node")
1586 logger.Info("copy ssh key to node %s" % node)
1587 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1589 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1590 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1596 keyarray.append(f.read())
1600 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1601 keyarray[3], keyarray[4], keyarray[5])
1604 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1606 # Add node to our /etc/hosts, and add key to known_hosts
1607 utils.AddHostToEtcHosts(new_node.name)
1609 if new_node.secondary_ip != new_node.primary_ip:
1610 if not rpc.call_node_tcp_ping(new_node.name,
1611 constants.LOCALHOST_IP_ADDRESS,
1612 new_node.secondary_ip,
1613 constants.DEFAULT_NODED_PORT,
1615 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1616 " you gave (%s). Please fix and re-run this"
1617 " command." % new_node.secondary_ip)
1619 success, msg = self.ssh.VerifyNodeHostname(node)
1621 raise errors.OpExecError("Node '%s' claims it has a different hostname"
1622 " than the one the resolver gives: %s."
1623 " Please fix and re-run this command." %
1626 # Distribute updated /etc/hosts and known_hosts to all nodes,
1627 # including the node just added
1628 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1629 dist_nodes = self.cfg.GetNodeList()
1630 if not self.op.readd:
1631 dist_nodes.append(node)
1632 if myself.name in dist_nodes:
1633 dist_nodes.remove(myself.name)
1635 logger.Debug("Copying hosts and known_hosts to all nodes")
1636 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1637 result = rpc.call_upload_file(dist_nodes, fname)
1638 for to_node in dist_nodes:
1639 if not result[to_node]:
1640 logger.Error("copy of file %s to node %s failed" %
1643 to_copy = ss.GetFileList()
1644 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1645 to_copy.append(constants.VNC_PASSWORD_FILE)
1646 for fname in to_copy:
1647 if not self.ssh.CopyFileToNode(node, fname):
1648 logger.Error("could not copy file %s to node %s" % (fname, node))
1650 if not self.op.readd:
1651 logger.Info("adding node %s to cluster.conf" % node)
1652 self.cfg.AddNode(new_node)
1655 class LUMasterFailover(LogicalUnit):
1656 """Failover the master node to the current node.
1658 This is a special LU in that it must run on a non-master node.
1661 HPATH = "master-failover"
1662 HTYPE = constants.HTYPE_CLUSTER
1666 def BuildHooksEnv(self):
1669 This will run on the new master only in the pre phase, and on all
1670 the nodes in the post phase.
1674 "OP_TARGET": self.new_master,
1675 "NEW_MASTER": self.new_master,
1676 "OLD_MASTER": self.old_master,
1678 return env, [self.new_master], self.cfg.GetNodeList()
1680 def CheckPrereq(self):
1681 """Check prerequisites.
1683 This checks that we are not already the master.
1686 self.new_master = utils.HostInfo().name
1687 self.old_master = self.sstore.GetMasterNode()
1689 if self.old_master == self.new_master:
1690 raise errors.OpPrereqError("This commands must be run on the node"
1691 " where you want the new master to be."
1692 " %s is already the master" %
1695 def Exec(self, feedback_fn):
1696 """Failover the master node.
1698 This command, when run on a non-master node, will cause the current
1699 master to cease being master, and the non-master to become new
1703 #TODO: do not rely on gethostname returning the FQDN
1704 logger.Info("setting master to %s, old master: %s" %
1705 (self.new_master, self.old_master))
1707 if not rpc.call_node_stop_master(self.old_master):
1708 logger.Error("could disable the master role on the old master"
1709 " %s, please disable manually" % self.old_master)
1712 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1713 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1714 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1715 logger.Error("could not distribute the new simple store master file"
1716 " to the other nodes, please check.")
1718 if not rpc.call_node_start_master(self.new_master):
1719 logger.Error("could not start the master role on the new master"
1720 " %s, please check" % self.new_master)
1721 feedback_fn("Error in activating the master IP on the new master,"
1722 " please fix manually.")
1726 class LUQueryClusterInfo(NoHooksLU):
1727 """Query cluster configuration.
1733 def CheckPrereq(self):
1734 """No prerequsites needed for this LU.
1739 def Exec(self, feedback_fn):
1740 """Return cluster config.
1744 "name": self.sstore.GetClusterName(),
1745 "software_version": constants.RELEASE_VERSION,
1746 "protocol_version": constants.PROTOCOL_VERSION,
1747 "config_version": constants.CONFIG_VERSION,
1748 "os_api_version": constants.OS_API_VERSION,
1749 "export_version": constants.EXPORT_VERSION,
1750 "master": self.sstore.GetMasterNode(),
1751 "architecture": (platform.architecture()[0], platform.machine()),
1752 "hypervisor_type": self.sstore.GetHypervisorType(),
1758 class LUClusterCopyFile(NoHooksLU):
1759 """Copy file to cluster.
1762 _OP_REQP = ["nodes", "filename"]
1764 def CheckPrereq(self):
1765 """Check prerequisites.
1767 It should check that the named file exists and that the given list
1771 if not os.path.exists(self.op.filename):
1772 raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
1774 self.nodes = _GetWantedNodes(self, self.op.nodes)
1776 def Exec(self, feedback_fn):
1777 """Copy a file from master to some nodes.
1780 opts - class with options as members
1781 args - list containing a single element, the file name
1783 nodes - list containing the name of target nodes; if empty, all nodes
1786 filename = self.op.filename
1788 myname = utils.HostInfo().name
1790 for node in self.nodes:
1793 if not self.ssh.CopyFileToNode(node, filename):
1794 logger.Error("Copy of file %s to node %s failed" % (filename, node))
1797 class LUDumpClusterConfig(NoHooksLU):
1798 """Return a text-representation of the cluster-config.
1803 def CheckPrereq(self):
1804 """No prerequisites.
1809 def Exec(self, feedback_fn):
1810 """Dump a representation of the cluster config to the standard output.
1813 return self.cfg.DumpConfig()
1816 class LURunClusterCommand(NoHooksLU):
1817 """Run a command on some nodes.
1820 _OP_REQP = ["command", "nodes"]
1822 def CheckPrereq(self):
1823 """Check prerequisites.
1825 It checks that the given list of nodes is valid.
1828 self.nodes = _GetWantedNodes(self, self.op.nodes)
1830 def Exec(self, feedback_fn):
1831 """Run a command on some nodes.
1834 # put the master at the end of the nodes list
1835 master_node = self.sstore.GetMasterNode()
1836 if master_node in self.nodes:
1837 self.nodes.remove(master_node)
1838 self.nodes.append(master_node)
1841 for node in self.nodes:
1842 result = self.ssh.Run(node, "root", self.op.command)
1843 data.append((node, result.output, result.exit_code))
1848 class LUActivateInstanceDisks(NoHooksLU):
1849 """Bring up an instance's disks.
1852 _OP_REQP = ["instance_name"]
1854 def CheckPrereq(self):
1855 """Check prerequisites.
1857 This checks that the instance is in the cluster.
1860 instance = self.cfg.GetInstanceInfo(
1861 self.cfg.ExpandInstanceName(self.op.instance_name))
1862 if instance is None:
1863 raise errors.OpPrereqError("Instance '%s' not known" %
1864 self.op.instance_name)
1865 self.instance = instance
1868 def Exec(self, feedback_fn):
1869 """Activate the disks.
1872 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1874 raise errors.OpExecError("Cannot activate block devices")
1879 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1880 """Prepare the block devices for an instance.
1882 This sets up the block devices on all nodes.
1885 instance: a ganeti.objects.Instance object
1886 ignore_secondaries: if true, errors on secondary nodes won't result
1887 in an error return from the function
1890 false if the operation failed
1891 list of (host, instance_visible_name, node_visible_name) if the operation
1892 suceeded with the mapping from node devices to instance devices
1896 iname = instance.name
1897 # With the two passes mechanism we try to reduce the window of
1898 # opportunity for the race condition of switching DRBD to primary
1899 # before handshaking occured, but we do not eliminate it
1901 # The proper fix would be to wait (with some limits) until the
1902 # connection has been made and drbd transitions from WFConnection
1903 # into any other network-connected state (Connected, SyncTarget,
1906 # 1st pass, assemble on all nodes in secondary mode
1907 for inst_disk in instance.disks:
1908 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1909 cfg.SetDiskID(node_disk, node)
1910 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1912 logger.Error("could not prepare block device %s on node %s"
1913 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1914 if not ignore_secondaries:
1917 # FIXME: race condition on drbd migration to primary
1919 # 2nd pass, do only the primary node
1920 for inst_disk in instance.disks:
1921 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1922 if node != instance.primary_node:
1924 cfg.SetDiskID(node_disk, node)
1925 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1927 logger.Error("could not prepare block device %s on node %s"
1928 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1930 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1932 # leave the disks configured for the primary node
1933 # this is a workaround that would be fixed better by
1934 # improving the logical/physical id handling
1935 for disk in instance.disks:
1936 cfg.SetDiskID(disk, instance.primary_node)
1938 return disks_ok, device_info
1941 def _StartInstanceDisks(cfg, instance, force):
1942 """Start the disks of an instance.
1945 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1946 ignore_secondaries=force)
1948 _ShutdownInstanceDisks(instance, cfg)
1949 if force is not None and not force:
1950 logger.Error("If the message above refers to a secondary node,"
1951 " you can retry the operation using '--force'.")
1952 raise errors.OpExecError("Disk consistency error")
1955 class LUDeactivateInstanceDisks(NoHooksLU):
1956 """Shutdown an instance's disks.
1959 _OP_REQP = ["instance_name"]
1961 def CheckPrereq(self):
1962 """Check prerequisites.
1964 This checks that the instance is in the cluster.
1967 instance = self.cfg.GetInstanceInfo(
1968 self.cfg.ExpandInstanceName(self.op.instance_name))
1969 if instance is None:
1970 raise errors.OpPrereqError("Instance '%s' not known" %
1971 self.op.instance_name)
1972 self.instance = instance
1974 def Exec(self, feedback_fn):
1975 """Deactivate the disks
1978 instance = self.instance
1979 ins_l = rpc.call_instance_list([instance.primary_node])
1980 ins_l = ins_l[instance.primary_node]
1981 if not type(ins_l) is list:
1982 raise errors.OpExecError("Can't contact node '%s'" %
1983 instance.primary_node)
1985 if self.instance.name in ins_l:
1986 raise errors.OpExecError("Instance is running, can't shutdown"
1989 _ShutdownInstanceDisks(instance, self.cfg)
1992 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1993 """Shutdown block devices of an instance.
1995 This does the shutdown on all nodes of the instance.
1997 If the ignore_primary is false, errors on the primary node are
2002 for disk in instance.disks:
2003 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2004 cfg.SetDiskID(top_disk, node)
2005 if not rpc.call_blockdev_shutdown(node, top_disk):
2006 logger.Error("could not shutdown block device %s on node %s" %
2007 (disk.iv_name, node))
2008 if not ignore_primary or node != instance.primary_node:
2013 def _CheckNodeFreeMemory(cfg, node, reason, requested):
2014 """Checks if a node has enough free memory.
2016 This function check if a given node has the needed amount of free
2017 memory. In case the node has less memory or we cannot get the
2018 information from the node, this function raise an OpPrereqError
2022 - cfg: a ConfigWriter instance
2023 - node: the node name
2024 - reason: string to use in the error message
2025 - requested: the amount of memory in MiB
2028 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
2029 if not nodeinfo or not isinstance(nodeinfo, dict):
2030 raise errors.OpPrereqError("Could not contact node %s for resource"
2031 " information" % (node,))
2033 free_mem = nodeinfo[node].get('memory_free')
2034 if not isinstance(free_mem, int):
2035 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2036 " was '%s'" % (node, free_mem))
2037 if requested > free_mem:
2038 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2039 " needed %s MiB, available %s MiB" %
2040 (node, reason, requested, free_mem))
2043 class LUStartupInstance(LogicalUnit):
2044 """Starts an instance.
2047 HPATH = "instance-start"
2048 HTYPE = constants.HTYPE_INSTANCE
2049 _OP_REQP = ["instance_name", "force"]
2051 def BuildHooksEnv(self):
2054 This runs on master, primary and secondary nodes of the instance.
2058 "FORCE": self.op.force,
2060 env.update(_BuildInstanceHookEnvByObject(self.instance))
2061 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2062 list(self.instance.secondary_nodes))
2065 def CheckPrereq(self):
2066 """Check prerequisites.
2068 This checks that the instance is in the cluster.
2071 instance = self.cfg.GetInstanceInfo(
2072 self.cfg.ExpandInstanceName(self.op.instance_name))
2073 if instance is None:
2074 raise errors.OpPrereqError("Instance '%s' not known" %
2075 self.op.instance_name)
2077 # check bridges existance
2078 _CheckInstanceBridgesExist(instance)
2080 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2081 "starting instance %s" % instance.name,
2084 self.instance = instance
2085 self.op.instance_name = instance.name
2087 def Exec(self, feedback_fn):
2088 """Start the instance.
2091 instance = self.instance
2092 force = self.op.force
2093 extra_args = getattr(self.op, "extra_args", "")
2095 self.cfg.MarkInstanceUp(instance.name)
2097 node_current = instance.primary_node
2099 _StartInstanceDisks(self.cfg, instance, force)
2101 if not rpc.call_instance_start(node_current, instance, extra_args):
2102 _ShutdownInstanceDisks(instance, self.cfg)
2103 raise errors.OpExecError("Could not start instance")
2106 class LURebootInstance(LogicalUnit):
2107 """Reboot an instance.
2110 HPATH = "instance-reboot"
2111 HTYPE = constants.HTYPE_INSTANCE
2112 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2114 def BuildHooksEnv(self):
2117 This runs on master, primary and secondary nodes of the instance.
2121 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2123 env.update(_BuildInstanceHookEnvByObject(self.instance))
2124 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2125 list(self.instance.secondary_nodes))
2128 def CheckPrereq(self):
2129 """Check prerequisites.
2131 This checks that the instance is in the cluster.
2134 instance = self.cfg.GetInstanceInfo(
2135 self.cfg.ExpandInstanceName(self.op.instance_name))
2136 if instance is None:
2137 raise errors.OpPrereqError("Instance '%s' not known" %
2138 self.op.instance_name)
2140 # check bridges existance
2141 _CheckInstanceBridgesExist(instance)
2143 self.instance = instance
2144 self.op.instance_name = instance.name
2146 def Exec(self, feedback_fn):
2147 """Reboot the instance.
2150 instance = self.instance
2151 ignore_secondaries = self.op.ignore_secondaries
2152 reboot_type = self.op.reboot_type
2153 extra_args = getattr(self.op, "extra_args", "")
2155 node_current = instance.primary_node
2157 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2158 constants.INSTANCE_REBOOT_HARD,
2159 constants.INSTANCE_REBOOT_FULL]:
2160 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2161 (constants.INSTANCE_REBOOT_SOFT,
2162 constants.INSTANCE_REBOOT_HARD,
2163 constants.INSTANCE_REBOOT_FULL))
2165 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2166 constants.INSTANCE_REBOOT_HARD]:
2167 if not rpc.call_instance_reboot(node_current, instance,
2168 reboot_type, extra_args):
2169 raise errors.OpExecError("Could not reboot instance")
2171 if not rpc.call_instance_shutdown(node_current, instance):
2172 raise errors.OpExecError("could not shutdown instance for full reboot")
2173 _ShutdownInstanceDisks(instance, self.cfg)
2174 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2175 if not rpc.call_instance_start(node_current, instance, extra_args):
2176 _ShutdownInstanceDisks(instance, self.cfg)
2177 raise errors.OpExecError("Could not start instance for full reboot")
2179 self.cfg.MarkInstanceUp(instance.name)
2182 class LUShutdownInstance(LogicalUnit):
2183 """Shutdown an instance.
2186 HPATH = "instance-stop"
2187 HTYPE = constants.HTYPE_INSTANCE
2188 _OP_REQP = ["instance_name"]
2190 def BuildHooksEnv(self):
2193 This runs on master, primary and secondary nodes of the instance.
2196 env = _BuildInstanceHookEnvByObject(self.instance)
2197 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2198 list(self.instance.secondary_nodes))
2201 def CheckPrereq(self):
2202 """Check prerequisites.
2204 This checks that the instance is in the cluster.
2207 instance = self.cfg.GetInstanceInfo(
2208 self.cfg.ExpandInstanceName(self.op.instance_name))
2209 if instance is None:
2210 raise errors.OpPrereqError("Instance '%s' not known" %
2211 self.op.instance_name)
2212 self.instance = instance
2214 def Exec(self, feedback_fn):
2215 """Shutdown the instance.
2218 instance = self.instance
2219 node_current = instance.primary_node
2220 self.cfg.MarkInstanceDown(instance.name)
2221 if not rpc.call_instance_shutdown(node_current, instance):
2222 logger.Error("could not shutdown instance")
2224 _ShutdownInstanceDisks(instance, self.cfg)
2227 class LUReinstallInstance(LogicalUnit):
2228 """Reinstall an instance.
2231 HPATH = "instance-reinstall"
2232 HTYPE = constants.HTYPE_INSTANCE
2233 _OP_REQP = ["instance_name"]
2235 def BuildHooksEnv(self):
2238 This runs on master, primary and secondary nodes of the instance.
2241 env = _BuildInstanceHookEnvByObject(self.instance)
2242 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2243 list(self.instance.secondary_nodes))
2246 def CheckPrereq(self):
2247 """Check prerequisites.
2249 This checks that the instance is in the cluster and is not running.
2252 instance = self.cfg.GetInstanceInfo(
2253 self.cfg.ExpandInstanceName(self.op.instance_name))
2254 if instance is None:
2255 raise errors.OpPrereqError("Instance '%s' not known" %
2256 self.op.instance_name)
2257 if instance.disk_template == constants.DT_DISKLESS:
2258 raise errors.OpPrereqError("Instance '%s' has no disks" %
2259 self.op.instance_name)
2260 if instance.status != "down":
2261 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2262 self.op.instance_name)
2263 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2265 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2266 (self.op.instance_name,
2267 instance.primary_node))
2269 self.op.os_type = getattr(self.op, "os_type", None)
2270 if self.op.os_type is not None:
2272 pnode = self.cfg.GetNodeInfo(
2273 self.cfg.ExpandNodeName(instance.primary_node))
2275 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2277 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2279 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2280 " primary node" % self.op.os_type)
2282 self.instance = instance
2284 def Exec(self, feedback_fn):
2285 """Reinstall the instance.
2288 inst = self.instance
2290 if self.op.os_type is not None:
2291 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2292 inst.os = self.op.os_type
2293 self.cfg.AddInstance(inst)
2295 _StartInstanceDisks(self.cfg, inst, None)
2297 feedback_fn("Running the instance OS create scripts...")
2298 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2299 raise errors.OpExecError("Could not install OS for instance %s"
2301 (inst.name, inst.primary_node))
2303 _ShutdownInstanceDisks(inst, self.cfg)
2306 class LURenameInstance(LogicalUnit):
2307 """Rename an instance.
2310 HPATH = "instance-rename"
2311 HTYPE = constants.HTYPE_INSTANCE
2312 _OP_REQP = ["instance_name", "new_name"]
2314 def BuildHooksEnv(self):
2317 This runs on master, primary and secondary nodes of the instance.
2320 env = _BuildInstanceHookEnvByObject(self.instance)
2321 env["INSTANCE_NEW_NAME"] = self.op.new_name
2322 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2323 list(self.instance.secondary_nodes))
2326 def CheckPrereq(self):
2327 """Check prerequisites.
2329 This checks that the instance is in the cluster and is not running.
2332 instance = self.cfg.GetInstanceInfo(
2333 self.cfg.ExpandInstanceName(self.op.instance_name))
2334 if instance is None:
2335 raise errors.OpPrereqError("Instance '%s' not known" %
2336 self.op.instance_name)
2337 if instance.status != "down":
2338 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2339 self.op.instance_name)
2340 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2342 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2343 (self.op.instance_name,
2344 instance.primary_node))
2345 self.instance = instance
2347 # new name verification
2348 name_info = utils.HostInfo(self.op.new_name)
2350 self.op.new_name = new_name = name_info.name
2351 instance_list = self.cfg.GetInstanceList()
2352 if new_name in instance_list:
2353 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2356 if not getattr(self.op, "ignore_ip", False):
2357 command = ["fping", "-q", name_info.ip]
2358 result = utils.RunCmd(command)
2359 if not result.failed:
2360 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2361 (name_info.ip, new_name))
2364 def Exec(self, feedback_fn):
2365 """Reinstall the instance.
2368 inst = self.instance
2369 old_name = inst.name
2371 if inst.disk_template == constants.DT_FILE:
2372 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2374 self.cfg.RenameInstance(inst.name, self.op.new_name)
2376 # re-read the instance from the configuration after rename
2377 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2379 if inst.disk_template == constants.DT_FILE:
2380 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2381 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2382 old_file_storage_dir,
2383 new_file_storage_dir)
2386 raise errors.OpExecError("Could not connect to node '%s' to rename"
2387 " directory '%s' to '%s' (but the instance"
2388 " has been renamed in Ganeti)" % (
2389 inst.primary_node, old_file_storage_dir,
2390 new_file_storage_dir))
2393 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2394 " (but the instance has been renamed in"
2395 " Ganeti)" % (old_file_storage_dir,
2396 new_file_storage_dir))
2398 _StartInstanceDisks(self.cfg, inst, None)
2400 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2402 msg = ("Could run OS rename script for instance %s on node %s (but the"
2403 " instance has been renamed in Ganeti)" %
2404 (inst.name, inst.primary_node))
2407 _ShutdownInstanceDisks(inst, self.cfg)
2410 class LURemoveInstance(LogicalUnit):
2411 """Remove an instance.
2414 HPATH = "instance-remove"
2415 HTYPE = constants.HTYPE_INSTANCE
2416 _OP_REQP = ["instance_name", "ignore_failures"]
2418 def BuildHooksEnv(self):
2421 This runs on master, primary and secondary nodes of the instance.
2424 env = _BuildInstanceHookEnvByObject(self.instance)
2425 nl = [self.sstore.GetMasterNode()]
2428 def CheckPrereq(self):
2429 """Check prerequisites.
2431 This checks that the instance is in the cluster.
2434 instance = self.cfg.GetInstanceInfo(
2435 self.cfg.ExpandInstanceName(self.op.instance_name))
2436 if instance is None:
2437 raise errors.OpPrereqError("Instance '%s' not known" %
2438 self.op.instance_name)
2439 self.instance = instance
2441 def Exec(self, feedback_fn):
2442 """Remove the instance.
2445 instance = self.instance
2446 logger.Info("shutting down instance %s on node %s" %
2447 (instance.name, instance.primary_node))
2449 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2450 if self.op.ignore_failures:
2451 feedback_fn("Warning: can't shutdown instance")
2453 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2454 (instance.name, instance.primary_node))
2456 logger.Info("removing block devices for instance %s" % instance.name)
2458 if not _RemoveDisks(instance, self.cfg):
2459 if self.op.ignore_failures:
2460 feedback_fn("Warning: can't remove instance's disks")
2462 raise errors.OpExecError("Can't remove instance's disks")
2464 logger.Info("removing instance %s out of cluster config" % instance.name)
2466 self.cfg.RemoveInstance(instance.name)
2469 class LUQueryInstances(NoHooksLU):
2470 """Logical unit for querying instances.
2473 _OP_REQP = ["output_fields", "names"]
2475 def CheckPrereq(self):
2476 """Check prerequisites.
2478 This checks that the fields required are valid output fields.
2481 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2482 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2483 "admin_state", "admin_ram",
2484 "disk_template", "ip", "mac", "bridge",
2485 "sda_size", "sdb_size", "vcpus"],
2486 dynamic=self.dynamic_fields,
2487 selected=self.op.output_fields)
2489 self.wanted = _GetWantedInstances(self, self.op.names)
2491 def Exec(self, feedback_fn):
2492 """Computes the list of nodes and their attributes.
2495 instance_names = self.wanted
2496 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2499 # begin data gathering
2501 nodes = frozenset([inst.primary_node for inst in instance_list])
2504 if self.dynamic_fields.intersection(self.op.output_fields):
2506 node_data = rpc.call_all_instances_info(nodes)
2508 result = node_data[name]
2510 live_data.update(result)
2511 elif result == False:
2512 bad_nodes.append(name)
2513 # else no instance is alive
2515 live_data = dict([(name, {}) for name in instance_names])
2517 # end data gathering
2520 for instance in instance_list:
2522 for field in self.op.output_fields:
2527 elif field == "pnode":
2528 val = instance.primary_node
2529 elif field == "snodes":
2530 val = list(instance.secondary_nodes)
2531 elif field == "admin_state":
2532 val = (instance.status != "down")
2533 elif field == "oper_state":
2534 if instance.primary_node in bad_nodes:
2537 val = bool(live_data.get(instance.name))
2538 elif field == "status":
2539 if instance.primary_node in bad_nodes:
2540 val = "ERROR_nodedown"
2542 running = bool(live_data.get(instance.name))
2544 if instance.status != "down":
2549 if instance.status != "down":
2553 elif field == "admin_ram":
2554 val = instance.memory
2555 elif field == "oper_ram":
2556 if instance.primary_node in bad_nodes:
2558 elif instance.name in live_data:
2559 val = live_data[instance.name].get("memory", "?")
2562 elif field == "disk_template":
2563 val = instance.disk_template
2565 val = instance.nics[0].ip
2566 elif field == "bridge":
2567 val = instance.nics[0].bridge
2568 elif field == "mac":
2569 val = instance.nics[0].mac
2570 elif field == "sda_size" or field == "sdb_size":
2571 disk = instance.FindDisk(field[:3])
2576 elif field == "vcpus":
2577 val = instance.vcpus
2579 raise errors.ParameterError(field)
2586 class LUFailoverInstance(LogicalUnit):
2587 """Failover an instance.
2590 HPATH = "instance-failover"
2591 HTYPE = constants.HTYPE_INSTANCE
2592 _OP_REQP = ["instance_name", "ignore_consistency"]
2594 def BuildHooksEnv(self):
2597 This runs on master, primary and secondary nodes of the instance.
2601 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2603 env.update(_BuildInstanceHookEnvByObject(self.instance))
2604 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2607 def CheckPrereq(self):
2608 """Check prerequisites.
2610 This checks that the instance is in the cluster.
2613 instance = self.cfg.GetInstanceInfo(
2614 self.cfg.ExpandInstanceName(self.op.instance_name))
2615 if instance is None:
2616 raise errors.OpPrereqError("Instance '%s' not known" %
2617 self.op.instance_name)
2619 if instance.disk_template not in constants.DTS_NET_MIRROR:
2620 raise errors.OpPrereqError("Instance's disk layout is not"
2621 " network mirrored, cannot failover.")
2623 secondary_nodes = instance.secondary_nodes
2624 if not secondary_nodes:
2625 raise errors.ProgrammerError("no secondary node but using "
2626 "a mirrored disk template")
2628 target_node = secondary_nodes[0]
2629 # check memory requirements on the secondary node
2630 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2631 instance.name, instance.memory)
2633 # check bridge existance
2634 brlist = [nic.bridge for nic in instance.nics]
2635 if not rpc.call_bridges_exist(target_node, brlist):
2636 raise errors.OpPrereqError("One or more target bridges %s does not"
2637 " exist on destination node '%s'" %
2638 (brlist, target_node))
2640 self.instance = instance
2642 def Exec(self, feedback_fn):
2643 """Failover an instance.
2645 The failover is done by shutting it down on its present node and
2646 starting it on the secondary.
2649 instance = self.instance
2651 source_node = instance.primary_node
2652 target_node = instance.secondary_nodes[0]
2654 feedback_fn("* checking disk consistency between source and target")
2655 for dev in instance.disks:
2656 # for drbd, these are drbd over lvm
2657 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2658 if instance.status == "up" and not self.op.ignore_consistency:
2659 raise errors.OpExecError("Disk %s is degraded on target node,"
2660 " aborting failover." % dev.iv_name)
2662 feedback_fn("* shutting down instance on source node")
2663 logger.Info("Shutting down instance %s on node %s" %
2664 (instance.name, source_node))
2666 if not rpc.call_instance_shutdown(source_node, instance):
2667 if self.op.ignore_consistency:
2668 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2669 " anyway. Please make sure node %s is down" %
2670 (instance.name, source_node, source_node))
2672 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2673 (instance.name, source_node))
2675 feedback_fn("* deactivating the instance's disks on source node")
2676 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2677 raise errors.OpExecError("Can't shut down the instance's disks.")
2679 instance.primary_node = target_node
2680 # distribute new instance config to the other nodes
2681 self.cfg.AddInstance(instance)
2683 # Only start the instance if it's marked as up
2684 if instance.status == "up":
2685 feedback_fn("* activating the instance's disks on target node")
2686 logger.Info("Starting instance %s on node %s" %
2687 (instance.name, target_node))
2689 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2690 ignore_secondaries=True)
2692 _ShutdownInstanceDisks(instance, self.cfg)
2693 raise errors.OpExecError("Can't activate the instance's disks")
2695 feedback_fn("* starting the instance on the target node")
2696 if not rpc.call_instance_start(target_node, instance, None):
2697 _ShutdownInstanceDisks(instance, self.cfg)
2698 raise errors.OpExecError("Could not start instance %s on node %s." %
2699 (instance.name, target_node))
2702 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2703 """Create a tree of block devices on the primary node.
2705 This always creates all devices.
2709 for child in device.children:
2710 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2713 cfg.SetDiskID(device, node)
2714 new_id = rpc.call_blockdev_create(node, device, device.size,
2715 instance.name, True, info)
2718 if device.physical_id is None:
2719 device.physical_id = new_id
2723 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2724 """Create a tree of block devices on a secondary node.
2726 If this device type has to be created on secondaries, create it and
2729 If not, just recurse to children keeping the same 'force' value.
2732 if device.CreateOnSecondary():
2735 for child in device.children:
2736 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2737 child, force, info):
2742 cfg.SetDiskID(device, node)
2743 new_id = rpc.call_blockdev_create(node, device, device.size,
2744 instance.name, False, info)
2747 if device.physical_id is None:
2748 device.physical_id = new_id
2752 def _GenerateUniqueNames(cfg, exts):
2753 """Generate a suitable LV name.
2755 This will generate a logical volume name for the given instance.
2760 new_id = cfg.GenerateUniqueID()
2761 results.append("%s%s" % (new_id, val))
2765 def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
2766 """Generate a drbd device complete with its children.
2769 port = cfg.AllocatePort()
2770 vgname = cfg.GetVGName()
2771 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2772 logical_id=(vgname, names[0]))
2773 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2774 logical_id=(vgname, names[1]))
2775 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
2776 logical_id = (primary, secondary, port),
2777 children = [dev_data, dev_meta])
2781 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2782 """Generate a drbd8 device complete with its children.
2785 port = cfg.AllocatePort()
2786 vgname = cfg.GetVGName()
2787 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2788 logical_id=(vgname, names[0]))
2789 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2790 logical_id=(vgname, names[1]))
2791 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2792 logical_id = (primary, secondary, port),
2793 children = [dev_data, dev_meta],
2798 def _GenerateDiskTemplate(cfg, template_name,
2799 instance_name, primary_node,
2800 secondary_nodes, disk_sz, swap_sz,
2801 file_storage_dir, file_driver):
2802 """Generate the entire disk layout for a given template type.
2805 #TODO: compute space requirements
2807 vgname = cfg.GetVGName()
2808 if template_name == constants.DT_DISKLESS:
2810 elif template_name == constants.DT_PLAIN:
2811 if len(secondary_nodes) != 0:
2812 raise errors.ProgrammerError("Wrong template configuration")
2814 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2815 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2816 logical_id=(vgname, names[0]),
2818 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2819 logical_id=(vgname, names[1]),
2821 disks = [sda_dev, sdb_dev]
2822 elif template_name == constants.DT_DRBD8:
2823 if len(secondary_nodes) != 1:
2824 raise errors.ProgrammerError("Wrong template configuration")
2825 remote_node = secondary_nodes[0]
2826 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2827 ".sdb_data", ".sdb_meta"])
2828 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2829 disk_sz, names[0:2], "sda")
2830 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2831 swap_sz, names[2:4], "sdb")
2832 disks = [drbd_sda_dev, drbd_sdb_dev]
2833 elif template_name == constants.DT_FILE:
2834 if len(secondary_nodes) != 0:
2835 raise errors.ProgrammerError("Wrong template configuration")
2837 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2838 iv_name="sda", logical_id=(file_driver,
2839 "%s/sda" % file_storage_dir))
2840 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2841 iv_name="sdb", logical_id=(file_driver,
2842 "%s/sdb" % file_storage_dir))
2843 disks = [file_sda_dev, file_sdb_dev]
2845 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2849 def _GetInstanceInfoText(instance):
2850 """Compute that text that should be added to the disk's metadata.
2853 return "originstname+%s" % instance.name
2856 def _CreateDisks(cfg, instance):
2857 """Create all disks for an instance.
2859 This abstracts away some work from AddInstance.
2862 instance: the instance object
2865 True or False showing the success of the creation process
2868 info = _GetInstanceInfoText(instance)
2870 if instance.disk_template == constants.DT_FILE:
2871 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2872 result = rpc.call_file_storage_dir_create(instance.primary_node,
2876 logger.Error("Could not connect to node '%s'" % instance.primary_node)
2880 logger.Error("failed to create directory '%s'" % file_storage_dir)
2883 for device in instance.disks:
2884 logger.Info("creating volume %s for instance %s" %
2885 (device.iv_name, instance.name))
2887 for secondary_node in instance.secondary_nodes:
2888 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2889 device, False, info):
2890 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2891 (device.iv_name, device, secondary_node))
2894 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2895 instance, device, info):
2896 logger.Error("failed to create volume %s on primary!" %
2903 def _RemoveDisks(instance, cfg):
2904 """Remove all disks for an instance.
2906 This abstracts away some work from `AddInstance()` and
2907 `RemoveInstance()`. Note that in case some of the devices couldn't
2908 be removed, the removal will continue with the other ones (compare
2909 with `_CreateDisks()`).
2912 instance: the instance object
2915 True or False showing the success of the removal proces
2918 logger.Info("removing block devices for instance %s" % instance.name)
2921 for device in instance.disks:
2922 for node, disk in device.ComputeNodeTree(instance.primary_node):
2923 cfg.SetDiskID(disk, node)
2924 if not rpc.call_blockdev_remove(node, disk):
2925 logger.Error("could not remove block device %s on node %s,"
2926 " continuing anyway" %
2927 (device.iv_name, node))
2930 if instance.disk_template == constants.DT_FILE:
2931 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2932 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2934 logger.Error("could not remove directory '%s'" % file_storage_dir)
2940 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2941 """Compute disk size requirements in the volume group
2943 This is currently hard-coded for the two-drive layout.
2946 # Required free disk space as a function of disk and swap space
2948 constants.DT_DISKLESS: None,
2949 constants.DT_PLAIN: disk_size + swap_size,
2950 # 256 MB are added for drbd metadata, 128MB for each drbd device
2951 constants.DT_DRBD8: disk_size + swap_size + 256,
2952 constants.DT_FILE: None,
2955 if disk_template not in req_size_dict:
2956 raise errors.ProgrammerError("Disk template '%s' size requirement"
2957 " is unknown" % disk_template)
2959 return req_size_dict[disk_template]
2962 class LUCreateInstance(LogicalUnit):
2963 """Create an instance.
2966 HPATH = "instance-add"
2967 HTYPE = constants.HTYPE_INSTANCE
2968 _OP_REQP = ["instance_name", "mem_size", "disk_size",
2969 "disk_template", "swap_size", "mode", "start", "vcpus",
2970 "wait_for_sync", "ip_check", "mac"]
2972 def _RunAllocator(self):
2973 """Run the allocator based on input opcode.
2976 disks = [{"size": self.op.disk_size, "mode": "w"},
2977 {"size": self.op.swap_size, "mode": "w"}]
2978 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2979 "bridge": self.op.bridge}]
2980 ial = IAllocator(self.cfg, self.sstore,
2981 mode=constants.IALLOCATOR_MODE_ALLOC,
2982 name=self.op.instance_name,
2983 disk_template=self.op.disk_template,
2986 vcpus=self.op.vcpus,
2987 mem_size=self.op.mem_size,
2992 ial.Run(self.op.iallocator)
2995 raise errors.OpPrereqError("Can't compute nodes using"
2996 " iallocator '%s': %s" % (self.op.iallocator,
2998 if len(ial.nodes) != ial.required_nodes:
2999 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3000 " of nodes (%s), required %s" %
3001 (len(ial.nodes), ial.required_nodes))
3002 self.op.pnode = ial.nodes[0]
3003 logger.ToStdout("Selected nodes for the instance: %s" %
3004 (", ".join(ial.nodes),))
3005 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
3006 (self.op.instance_name, self.op.iallocator, ial.nodes))
3007 if ial.required_nodes == 2:
3008 self.op.snode = ial.nodes[1]
3010 def BuildHooksEnv(self):
3013 This runs on master, primary and secondary nodes of the instance.
3017 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
3018 "INSTANCE_DISK_SIZE": self.op.disk_size,
3019 "INSTANCE_SWAP_SIZE": self.op.swap_size,
3020 "INSTANCE_ADD_MODE": self.op.mode,
3022 if self.op.mode == constants.INSTANCE_IMPORT:
3023 env["INSTANCE_SRC_NODE"] = self.op.src_node
3024 env["INSTANCE_SRC_PATH"] = self.op.src_path
3025 env["INSTANCE_SRC_IMAGE"] = self.src_image
3027 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3028 primary_node=self.op.pnode,
3029 secondary_nodes=self.secondaries,
3030 status=self.instance_status,
3031 os_type=self.op.os_type,
3032 memory=self.op.mem_size,
3033 vcpus=self.op.vcpus,
3034 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3037 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3042 def CheckPrereq(self):
3043 """Check prerequisites.
3046 # set optional parameters to none if they don't exist
3047 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3048 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3049 "vnc_bind_address"]:
3050 if not hasattr(self.op, attr):
3051 setattr(self.op, attr, None)
3053 if self.op.mode not in (constants.INSTANCE_CREATE,
3054 constants.INSTANCE_IMPORT):
3055 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3058 if (not self.cfg.GetVGName() and
3059 self.op.disk_template not in constants.DTS_NOT_LVM):
3060 raise errors.OpPrereqError("Cluster does not support lvm-based"
3063 if self.op.mode == constants.INSTANCE_IMPORT:
3064 src_node = getattr(self.op, "src_node", None)
3065 src_path = getattr(self.op, "src_path", None)
3066 if src_node is None or src_path is None:
3067 raise errors.OpPrereqError("Importing an instance requires source"
3068 " node and path options")
3069 src_node_full = self.cfg.ExpandNodeName(src_node)
3070 if src_node_full is None:
3071 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3072 self.op.src_node = src_node = src_node_full
3074 if not os.path.isabs(src_path):
3075 raise errors.OpPrereqError("The source path must be absolute")
3077 export_info = rpc.call_export_info(src_node, src_path)
3080 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3082 if not export_info.has_section(constants.INISECT_EXP):
3083 raise errors.ProgrammerError("Corrupted export config")
3085 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3086 if (int(ei_version) != constants.EXPORT_VERSION):
3087 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3088 (ei_version, constants.EXPORT_VERSION))
3090 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3091 raise errors.OpPrereqError("Can't import instance with more than"
3094 # FIXME: are the old os-es, disk sizes, etc. useful?
3095 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3096 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3098 self.src_image = diskimage
3099 else: # INSTANCE_CREATE
3100 if getattr(self.op, "os_type", None) is None:
3101 raise errors.OpPrereqError("No guest OS specified")
3103 #### instance parameters check
3105 # disk template and mirror node verification
3106 if self.op.disk_template not in constants.DISK_TEMPLATES:
3107 raise errors.OpPrereqError("Invalid disk template name")
3109 # instance name verification
3110 hostname1 = utils.HostInfo(self.op.instance_name)
3112 self.op.instance_name = instance_name = hostname1.name
3113 instance_list = self.cfg.GetInstanceList()
3114 if instance_name in instance_list:
3115 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3118 # ip validity checks
3119 ip = getattr(self.op, "ip", None)
3120 if ip is None or ip.lower() == "none":
3122 elif ip.lower() == "auto":
3123 inst_ip = hostname1.ip
3125 if not utils.IsValidIP(ip):
3126 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3127 " like a valid IP" % ip)
3129 self.inst_ip = self.op.ip = inst_ip
3131 if self.op.start and not self.op.ip_check:
3132 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3133 " adding an instance in start mode")
3135 if self.op.ip_check:
3136 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3137 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3138 (hostname1.ip, instance_name))
3140 # MAC address verification
3141 if self.op.mac != "auto":
3142 if not utils.IsValidMac(self.op.mac.lower()):
3143 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3146 # bridge verification
3147 bridge = getattr(self.op, "bridge", None)
3149 self.op.bridge = self.cfg.GetDefBridge()
3151 self.op.bridge = bridge
3153 # boot order verification
3154 if self.op.hvm_boot_order is not None:
3155 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3156 raise errors.OpPrereqError("invalid boot order specified,"
3157 " must be one or more of [acdn]")
3158 # file storage checks
3159 if (self.op.file_driver and
3160 not self.op.file_driver in constants.FILE_DRIVER):
3161 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3162 self.op.file_driver)
3164 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3165 raise errors.OpPrereqError("File storage directory not a relative"
3169 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3170 raise errors.OpPrereqError("One and only one of iallocator and primary"
3171 " node must be given")
3173 if self.op.iallocator is not None:
3174 self._RunAllocator()
3176 #### node related checks
3178 # check primary node
3179 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3181 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3183 self.op.pnode = pnode.name
3185 self.secondaries = []
3187 # mirror node verification
3188 if self.op.disk_template in constants.DTS_NET_MIRROR:
3189 if getattr(self.op, "snode", None) is None:
3190 raise errors.OpPrereqError("The networked disk templates need"
3193 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3194 if snode_name is None:
3195 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3197 elif snode_name == pnode.name:
3198 raise errors.OpPrereqError("The secondary node cannot be"
3199 " the primary node.")
3200 self.secondaries.append(snode_name)
3202 req_size = _ComputeDiskSize(self.op.disk_template,
3203 self.op.disk_size, self.op.swap_size)
3205 # Check lv size requirements
3206 if req_size is not None:
3207 nodenames = [pnode.name] + self.secondaries
3208 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3209 for node in nodenames:
3210 info = nodeinfo.get(node, None)
3212 raise errors.OpPrereqError("Cannot get current information"
3213 " from node '%s'" % nodeinfo)
3214 vg_free = info.get('vg_free', None)
3215 if not isinstance(vg_free, int):
3216 raise errors.OpPrereqError("Can't compute free disk space on"
3218 if req_size > info['vg_free']:
3219 raise errors.OpPrereqError("Not enough disk space on target node %s."
3220 " %d MB available, %d MB required" %
3221 (node, info['vg_free'], req_size))
3224 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3226 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3227 " primary node" % self.op.os_type)
3229 if self.op.kernel_path == constants.VALUE_NONE:
3230 raise errors.OpPrereqError("Can't set instance kernel to none")
3233 # bridge check on primary node
3234 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3235 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3236 " destination node '%s'" %
3237 (self.op.bridge, pnode.name))
3239 # memory check on primary node
3241 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3242 "creating instance %s" % self.op.instance_name,
3245 # hvm_cdrom_image_path verification
3246 if self.op.hvm_cdrom_image_path is not None:
3247 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3248 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3249 " be an absolute path or None, not %s" %
3250 self.op.hvm_cdrom_image_path)
3251 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3252 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3253 " regular file or a symlink pointing to"
3254 " an existing regular file, not %s" %
3255 self.op.hvm_cdrom_image_path)
3257 # vnc_bind_address verification
3258 if self.op.vnc_bind_address is not None:
3259 if not utils.IsValidIP(self.op.vnc_bind_address):
3260 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3261 " like a valid IP address" %
3262 self.op.vnc_bind_address)
3265 self.instance_status = 'up'
3267 self.instance_status = 'down'
3269 def Exec(self, feedback_fn):
3270 """Create and add the instance to the cluster.
3273 instance = self.op.instance_name
3274 pnode_name = self.pnode.name
3276 if self.op.mac == "auto":
3277 mac_address = self.cfg.GenerateMAC()
3279 mac_address = self.op.mac
3281 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3282 if self.inst_ip is not None:
3283 nic.ip = self.inst_ip
3285 ht_kind = self.sstore.GetHypervisorType()
3286 if ht_kind in constants.HTS_REQ_PORT:
3287 network_port = self.cfg.AllocatePort()
3291 if self.op.vnc_bind_address is None:
3292 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3294 # this is needed because os.path.join does not accept None arguments
3295 if self.op.file_storage_dir is None:
3296 string_file_storage_dir = ""
3298 string_file_storage_dir = self.op.file_storage_dir
3300 # build the full file storage dir path
3301 file_storage_dir = os.path.normpath(os.path.join(
3302 self.sstore.GetFileStorageDir(),
3303 string_file_storage_dir, instance))
3306 disks = _GenerateDiskTemplate(self.cfg,
3307 self.op.disk_template,
3308 instance, pnode_name,
3309 self.secondaries, self.op.disk_size,
3312 self.op.file_driver)
3314 iobj = objects.Instance(name=instance, os=self.op.os_type,
3315 primary_node=pnode_name,
3316 memory=self.op.mem_size,
3317 vcpus=self.op.vcpus,
3318 nics=[nic], disks=disks,
3319 disk_template=self.op.disk_template,
3320 status=self.instance_status,
3321 network_port=network_port,
3322 kernel_path=self.op.kernel_path,
3323 initrd_path=self.op.initrd_path,
3324 hvm_boot_order=self.op.hvm_boot_order,
3325 hvm_acpi=self.op.hvm_acpi,
3326 hvm_pae=self.op.hvm_pae,
3327 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3328 vnc_bind_address=self.op.vnc_bind_address,
3331 feedback_fn("* creating instance disks...")
3332 if not _CreateDisks(self.cfg, iobj):
3333 _RemoveDisks(iobj, self.cfg)
3334 raise errors.OpExecError("Device creation failed, reverting...")
3336 feedback_fn("adding instance %s to cluster config" % instance)
3338 self.cfg.AddInstance(iobj)
3340 if self.op.wait_for_sync:
3341 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3342 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3343 # make sure the disks are not degraded (still sync-ing is ok)
3345 feedback_fn("* checking mirrors status")
3346 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3351 _RemoveDisks(iobj, self.cfg)
3352 self.cfg.RemoveInstance(iobj.name)
3353 raise errors.OpExecError("There are some degraded disks for"
3356 feedback_fn("creating os for instance %s on node %s" %
3357 (instance, pnode_name))
3359 if iobj.disk_template != constants.DT_DISKLESS:
3360 if self.op.mode == constants.INSTANCE_CREATE:
3361 feedback_fn("* running the instance OS create scripts...")
3362 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3363 raise errors.OpExecError("could not add os for instance %s"
3365 (instance, pnode_name))
3367 elif self.op.mode == constants.INSTANCE_IMPORT:
3368 feedback_fn("* running the instance OS import scripts...")
3369 src_node = self.op.src_node
3370 src_image = self.src_image
3371 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3372 src_node, src_image):
3373 raise errors.OpExecError("Could not import os for instance"
3375 (instance, pnode_name))
3377 # also checked in the prereq part
3378 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3382 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3383 feedback_fn("* starting instance...")
3384 if not rpc.call_instance_start(pnode_name, iobj, None):
3385 raise errors.OpExecError("Could not start instance")
3388 class LUConnectConsole(NoHooksLU):
3389 """Connect to an instance's console.
3391 This is somewhat special in that it returns the command line that
3392 you need to run on the master node in order to connect to the
3396 _OP_REQP = ["instance_name"]
3398 def CheckPrereq(self):
3399 """Check prerequisites.
3401 This checks that the instance is in the cluster.
3404 instance = self.cfg.GetInstanceInfo(
3405 self.cfg.ExpandInstanceName(self.op.instance_name))
3406 if instance is None:
3407 raise errors.OpPrereqError("Instance '%s' not known" %
3408 self.op.instance_name)
3409 self.instance = instance
3411 def Exec(self, feedback_fn):
3412 """Connect to the console of an instance
3415 instance = self.instance
3416 node = instance.primary_node
3418 node_insts = rpc.call_instance_list([node])[node]
3419 if node_insts is False:
3420 raise errors.OpExecError("Can't connect to node %s." % node)
3422 if instance.name not in node_insts:
3423 raise errors.OpExecError("Instance %s is not running." % instance.name)
3425 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3427 hyper = hypervisor.GetHypervisor()
3428 console_cmd = hyper.GetShellCommandForConsole(instance)
3431 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3434 class LUReplaceDisks(LogicalUnit):
3435 """Replace the disks of an instance.
3438 HPATH = "mirrors-replace"
3439 HTYPE = constants.HTYPE_INSTANCE
3440 _OP_REQP = ["instance_name", "mode", "disks"]
3442 def _RunAllocator(self):
3443 """Compute a new secondary node using an IAllocator.
3446 ial = IAllocator(self.cfg, self.sstore,
3447 mode=constants.IALLOCATOR_MODE_RELOC,
3448 name=self.op.instance_name,
3449 relocate_from=[self.sec_node])
3451 ial.Run(self.op.iallocator)
3454 raise errors.OpPrereqError("Can't compute nodes using"
3455 " iallocator '%s': %s" % (self.op.iallocator,
3457 if len(ial.nodes) != ial.required_nodes:
3458 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3459 " of nodes (%s), required %s" %
3460 (len(ial.nodes), ial.required_nodes))
3461 self.op.remote_node = ial.nodes[0]
3462 logger.ToStdout("Selected new secondary for the instance: %s" %
3463 self.op.remote_node)
3465 def BuildHooksEnv(self):
3468 This runs on the master, the primary and all the secondaries.
3472 "MODE": self.op.mode,
3473 "NEW_SECONDARY": self.op.remote_node,
3474 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3476 env.update(_BuildInstanceHookEnvByObject(self.instance))
3478 self.sstore.GetMasterNode(),
3479 self.instance.primary_node,
3481 if self.op.remote_node is not None:
3482 nl.append(self.op.remote_node)
3485 def CheckPrereq(self):
3486 """Check prerequisites.
3488 This checks that the instance is in the cluster.
3491 if not hasattr(self.op, "remote_node"):
3492 self.op.remote_node = None
3494 instance = self.cfg.GetInstanceInfo(
3495 self.cfg.ExpandInstanceName(self.op.instance_name))
3496 if instance is None:
3497 raise errors.OpPrereqError("Instance '%s' not known" %
3498 self.op.instance_name)
3499 self.instance = instance
3500 self.op.instance_name = instance.name
3502 if instance.disk_template not in constants.DTS_NET_MIRROR:
3503 raise errors.OpPrereqError("Instance's disk layout is not"
3504 " network mirrored.")
3506 if len(instance.secondary_nodes) != 1:
3507 raise errors.OpPrereqError("The instance has a strange layout,"
3508 " expected one secondary but found %d" %
3509 len(instance.secondary_nodes))
3511 self.sec_node = instance.secondary_nodes[0]
3513 ia_name = getattr(self.op, "iallocator", None)
3514 if ia_name is not None:
3515 if self.op.remote_node is not None:
3516 raise errors.OpPrereqError("Give either the iallocator or the new"
3517 " secondary, not both")
3518 self.op.remote_node = self._RunAllocator()
3520 remote_node = self.op.remote_node
3521 if remote_node is not None:
3522 remote_node = self.cfg.ExpandNodeName(remote_node)
3523 if remote_node is None:
3524 raise errors.OpPrereqError("Node '%s' not known" %
3525 self.op.remote_node)
3526 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3528 self.remote_node_info = None
3529 if remote_node == instance.primary_node:
3530 raise errors.OpPrereqError("The specified node is the primary node of"
3532 elif remote_node == self.sec_node:
3533 if self.op.mode == constants.REPLACE_DISK_SEC:
3534 # this is for DRBD8, where we can't execute the same mode of
3535 # replacement as for drbd7 (no different port allocated)
3536 raise errors.OpPrereqError("Same secondary given, cannot execute"
3538 if instance.disk_template == constants.DT_DRBD8:
3539 if (self.op.mode == constants.REPLACE_DISK_ALL and
3540 remote_node is not None):
3541 # switch to replace secondary mode
3542 self.op.mode = constants.REPLACE_DISK_SEC
3544 if self.op.mode == constants.REPLACE_DISK_ALL:
3545 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3546 " secondary disk replacement, not"
3548 elif self.op.mode == constants.REPLACE_DISK_PRI:
3549 if remote_node is not None:
3550 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3551 " the secondary while doing a primary"
3552 " node disk replacement")
3553 self.tgt_node = instance.primary_node
3554 self.oth_node = instance.secondary_nodes[0]
3555 elif self.op.mode == constants.REPLACE_DISK_SEC:
3556 self.new_node = remote_node # this can be None, in which case
3557 # we don't change the secondary
3558 self.tgt_node = instance.secondary_nodes[0]
3559 self.oth_node = instance.primary_node
3561 raise errors.ProgrammerError("Unhandled disk replace mode")
3563 for name in self.op.disks:
3564 if instance.FindDisk(name) is None:
3565 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3566 (name, instance.name))
3567 self.op.remote_node = remote_node
3569 def _ExecD8DiskOnly(self, feedback_fn):
3570 """Replace a disk on the primary or secondary for dbrd8.
3572 The algorithm for replace is quite complicated:
3573 - for each disk to be replaced:
3574 - create new LVs on the target node with unique names
3575 - detach old LVs from the drbd device
3576 - rename old LVs to name_replaced.<time_t>
3577 - rename new LVs to old LVs
3578 - attach the new LVs (with the old names now) to the drbd device
3579 - wait for sync across all devices
3580 - for each modified disk:
3581 - remove old LVs (which have the name name_replaces.<time_t>)
3583 Failures are not very well handled.
3587 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3588 instance = self.instance
3590 vgname = self.cfg.GetVGName()
3593 tgt_node = self.tgt_node
3594 oth_node = self.oth_node
3596 # Step: check device activation
3597 self.proc.LogStep(1, steps_total, "check device existence")
3598 info("checking volume groups")
3599 my_vg = cfg.GetVGName()
3600 results = rpc.call_vg_list([oth_node, tgt_node])
3602 raise errors.OpExecError("Can't list volume groups on the nodes")
3603 for node in oth_node, tgt_node:
3604 res = results.get(node, False)
3605 if not res or my_vg not in res:
3606 raise errors.OpExecError("Volume group '%s' not found on %s" %
3608 for dev in instance.disks:
3609 if not dev.iv_name in self.op.disks:
3611 for node in tgt_node, oth_node:
3612 info("checking %s on %s" % (dev.iv_name, node))
3613 cfg.SetDiskID(dev, node)
3614 if not rpc.call_blockdev_find(node, dev):
3615 raise errors.OpExecError("Can't find device %s on node %s" %
3616 (dev.iv_name, node))
3618 # Step: check other node consistency
3619 self.proc.LogStep(2, steps_total, "check peer consistency")
3620 for dev in instance.disks:
3621 if not dev.iv_name in self.op.disks:
3623 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3624 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3625 oth_node==instance.primary_node):
3626 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3627 " to replace disks on this node (%s)" %
3628 (oth_node, tgt_node))
3630 # Step: create new storage
3631 self.proc.LogStep(3, steps_total, "allocate new storage")
3632 for dev in instance.disks:
3633 if not dev.iv_name in self.op.disks:
3636 cfg.SetDiskID(dev, tgt_node)
3637 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3638 names = _GenerateUniqueNames(cfg, lv_names)
3639 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3640 logical_id=(vgname, names[0]))
3641 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3642 logical_id=(vgname, names[1]))
3643 new_lvs = [lv_data, lv_meta]
3644 old_lvs = dev.children
3645 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3646 info("creating new local storage on %s for %s" %
3647 (tgt_node, dev.iv_name))
3648 # since we *always* want to create this LV, we use the
3649 # _Create...OnPrimary (which forces the creation), even if we
3650 # are talking about the secondary node
3651 for new_lv in new_lvs:
3652 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3653 _GetInstanceInfoText(instance)):
3654 raise errors.OpExecError("Failed to create new LV named '%s' on"
3656 (new_lv.logical_id[1], tgt_node))
3658 # Step: for each lv, detach+rename*2+attach
3659 self.proc.LogStep(4, steps_total, "change drbd configuration")
3660 for dev, old_lvs, new_lvs in iv_names.itervalues():
3661 info("detaching %s drbd from local storage" % dev.iv_name)
3662 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3663 raise errors.OpExecError("Can't detach drbd from local storage on node"
3664 " %s for device %s" % (tgt_node, dev.iv_name))
3666 #cfg.Update(instance)
3668 # ok, we created the new LVs, so now we know we have the needed
3669 # storage; as such, we proceed on the target node to rename
3670 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3671 # using the assumption that logical_id == physical_id (which in
3672 # turn is the unique_id on that node)
3674 # FIXME(iustin): use a better name for the replaced LVs
3675 temp_suffix = int(time.time())
3676 ren_fn = lambda d, suff: (d.physical_id[0],
3677 d.physical_id[1] + "_replaced-%s" % suff)
3678 # build the rename list based on what LVs exist on the node
3680 for to_ren in old_lvs:
3681 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3682 if find_res is not None: # device exists
3683 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3685 info("renaming the old LVs on the target node")
3686 if not rpc.call_blockdev_rename(tgt_node, rlist):
3687 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3688 # now we rename the new LVs to the old LVs
3689 info("renaming the new LVs on the target node")
3690 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3691 if not rpc.call_blockdev_rename(tgt_node, rlist):
3692 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3694 for old, new in zip(old_lvs, new_lvs):
3695 new.logical_id = old.logical_id
3696 cfg.SetDiskID(new, tgt_node)
3698 for disk in old_lvs:
3699 disk.logical_id = ren_fn(disk, temp_suffix)
3700 cfg.SetDiskID(disk, tgt_node)
3702 # now that the new lvs have the old name, we can add them to the device
3703 info("adding new mirror component on %s" % tgt_node)
3704 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3705 for new_lv in new_lvs:
3706 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3707 warning("Can't rollback device %s", hint="manually cleanup unused"
3709 raise errors.OpExecError("Can't add local storage to drbd")
3711 dev.children = new_lvs
3712 cfg.Update(instance)
3714 # Step: wait for sync
3716 # this can fail as the old devices are degraded and _WaitForSync
3717 # does a combined result over all disks, so we don't check its
3719 self.proc.LogStep(5, steps_total, "sync devices")
3720 _WaitForSync(cfg, instance, self.proc, unlock=True)
3722 # so check manually all the devices
3723 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3724 cfg.SetDiskID(dev, instance.primary_node)
3725 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3727 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3729 # Step: remove old storage
3730 self.proc.LogStep(6, steps_total, "removing old storage")
3731 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3732 info("remove logical volumes for %s" % name)
3734 cfg.SetDiskID(lv, tgt_node)
3735 if not rpc.call_blockdev_remove(tgt_node, lv):
3736 warning("Can't remove old LV", hint="manually remove unused LVs")
3739 def _ExecD8Secondary(self, feedback_fn):
3740 """Replace the secondary node for drbd8.
3742 The algorithm for replace is quite complicated:
3743 - for all disks of the instance:
3744 - create new LVs on the new node with same names
3745 - shutdown the drbd device on the old secondary
3746 - disconnect the drbd network on the primary
3747 - create the drbd device on the new secondary
3748 - network attach the drbd on the primary, using an artifice:
3749 the drbd code for Attach() will connect to the network if it
3750 finds a device which is connected to the good local disks but
3752 - wait for sync across all devices
3753 - remove all disks from the old secondary
3755 Failures are not very well handled.
3759 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3760 instance = self.instance
3762 vgname = self.cfg.GetVGName()
3765 old_node = self.tgt_node
3766 new_node = self.new_node
3767 pri_node = instance.primary_node
3769 # Step: check device activation
3770 self.proc.LogStep(1, steps_total, "check device existence")
3771 info("checking volume groups")
3772 my_vg = cfg.GetVGName()
3773 results = rpc.call_vg_list([pri_node, new_node])
3775 raise errors.OpExecError("Can't list volume groups on the nodes")
3776 for node in pri_node, new_node:
3777 res = results.get(node, False)
3778 if not res or my_vg not in res:
3779 raise errors.OpExecError("Volume group '%s' not found on %s" %
3781 for dev in instance.disks:
3782 if not dev.iv_name in self.op.disks:
3784 info("checking %s on %s" % (dev.iv_name, pri_node))
3785 cfg.SetDiskID(dev, pri_node)
3786 if not rpc.call_blockdev_find(pri_node, dev):
3787 raise errors.OpExecError("Can't find device %s on node %s" %
3788 (dev.iv_name, pri_node))
3790 # Step: check other node consistency
3791 self.proc.LogStep(2, steps_total, "check peer consistency")
3792 for dev in instance.disks:
3793 if not dev.iv_name in self.op.disks:
3795 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3796 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3797 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3798 " unsafe to replace the secondary" %
3801 # Step: create new storage
3802 self.proc.LogStep(3, steps_total, "allocate new storage")
3803 for dev in instance.disks:
3805 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3806 # since we *always* want to create this LV, we use the
3807 # _Create...OnPrimary (which forces the creation), even if we
3808 # are talking about the secondary node
3809 for new_lv in dev.children:
3810 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3811 _GetInstanceInfoText(instance)):
3812 raise errors.OpExecError("Failed to create new LV named '%s' on"
3814 (new_lv.logical_id[1], new_node))
3816 iv_names[dev.iv_name] = (dev, dev.children)
3818 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3819 for dev in instance.disks:
3821 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3822 # create new devices on new_node
3823 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3824 logical_id=(pri_node, new_node,
3826 children=dev.children)
3827 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3829 _GetInstanceInfoText(instance)):
3830 raise errors.OpExecError("Failed to create new DRBD on"
3831 " node '%s'" % new_node)
3833 for dev in instance.disks:
3834 # we have new devices, shutdown the drbd on the old secondary
3835 info("shutting down drbd for %s on old node" % dev.iv_name)
3836 cfg.SetDiskID(dev, old_node)
3837 if not rpc.call_blockdev_shutdown(old_node, dev):
3838 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3839 hint="Please cleanup this device manually as soon as possible")
3841 info("detaching primary drbds from the network (=> standalone)")
3843 for dev in instance.disks:
3844 cfg.SetDiskID(dev, pri_node)
3845 # set the physical (unique in bdev terms) id to None, meaning
3846 # detach from network
3847 dev.physical_id = (None,) * len(dev.physical_id)
3848 # and 'find' the device, which will 'fix' it to match the
3850 if rpc.call_blockdev_find(pri_node, dev):
3853 warning("Failed to detach drbd %s from network, unusual case" %
3857 # no detaches succeeded (very unlikely)
3858 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3860 # if we managed to detach at least one, we update all the disks of
3861 # the instance to point to the new secondary
3862 info("updating instance configuration")
3863 for dev in instance.disks:
3864 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3865 cfg.SetDiskID(dev, pri_node)
3866 cfg.Update(instance)
3868 # and now perform the drbd attach
3869 info("attaching primary drbds to new secondary (standalone => connected)")
3871 for dev in instance.disks:
3872 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3873 # since the attach is smart, it's enough to 'find' the device,
3874 # it will automatically activate the network, if the physical_id
3876 cfg.SetDiskID(dev, pri_node)
3877 if not rpc.call_blockdev_find(pri_node, dev):
3878 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3879 "please do a gnt-instance info to see the status of disks")
3881 # this can fail as the old devices are degraded and _WaitForSync
3882 # does a combined result over all disks, so we don't check its
3884 self.proc.LogStep(5, steps_total, "sync devices")
3885 _WaitForSync(cfg, instance, self.proc, unlock=True)
3887 # so check manually all the devices
3888 for name, (dev, old_lvs) in iv_names.iteritems():
3889 cfg.SetDiskID(dev, pri_node)
3890 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3892 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3894 self.proc.LogStep(6, steps_total, "removing old storage")
3895 for name, (dev, old_lvs) in iv_names.iteritems():
3896 info("remove logical volumes for %s" % name)
3898 cfg.SetDiskID(lv, old_node)
3899 if not rpc.call_blockdev_remove(old_node, lv):
3900 warning("Can't remove LV on old secondary",
3901 hint="Cleanup stale volumes by hand")
3903 def Exec(self, feedback_fn):
3904 """Execute disk replacement.
3906 This dispatches the disk replacement to the appropriate handler.
3909 instance = self.instance
3910 if instance.disk_template == constants.DT_DRBD8:
3911 if self.op.remote_node is None:
3912 fn = self._ExecD8DiskOnly
3914 fn = self._ExecD8Secondary
3916 raise errors.ProgrammerError("Unhandled disk replacement case")
3917 return fn(feedback_fn)
3920 class LUQueryInstanceData(NoHooksLU):
3921 """Query runtime instance data.
3924 _OP_REQP = ["instances"]
3926 def CheckPrereq(self):
3927 """Check prerequisites.
3929 This only checks the optional instance list against the existing names.
3932 if not isinstance(self.op.instances, list):
3933 raise errors.OpPrereqError("Invalid argument type 'instances'")
3934 if self.op.instances:
3935 self.wanted_instances = []
3936 names = self.op.instances
3938 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3939 if instance is None:
3940 raise errors.OpPrereqError("No such instance name '%s'" % name)
3941 self.wanted_instances.append(instance)
3943 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
3944 in self.cfg.GetInstanceList()]
3948 def _ComputeDiskStatus(self, instance, snode, dev):
3949 """Compute block device status.
3952 self.cfg.SetDiskID(dev, instance.primary_node)
3953 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
3954 if dev.dev_type in constants.LDS_DRBD:
3955 # we change the snode then (otherwise we use the one passed in)
3956 if dev.logical_id[0] == instance.primary_node:
3957 snode = dev.logical_id[1]
3959 snode = dev.logical_id[0]
3962 self.cfg.SetDiskID(dev, snode)
3963 dev_sstatus = rpc.call_blockdev_find(snode, dev)
3968 dev_children = [self._ComputeDiskStatus(instance, snode, child)
3969 for child in dev.children]
3974 "iv_name": dev.iv_name,
3975 "dev_type": dev.dev_type,
3976 "logical_id": dev.logical_id,
3977 "physical_id": dev.physical_id,
3978 "pstatus": dev_pstatus,
3979 "sstatus": dev_sstatus,
3980 "children": dev_children,
3985 def Exec(self, feedback_fn):
3986 """Gather and return data"""
3988 for instance in self.wanted_instances:
3989 remote_info = rpc.call_instance_info(instance.primary_node,
3991 if remote_info and "state" in remote_info:
3994 remote_state = "down"
3995 if instance.status == "down":
3996 config_state = "down"
4000 disks = [self._ComputeDiskStatus(instance, None, device)
4001 for device in instance.disks]
4004 "name": instance.name,
4005 "config_state": config_state,
4006 "run_state": remote_state,
4007 "pnode": instance.primary_node,
4008 "snodes": instance.secondary_nodes,
4010 "memory": instance.memory,
4011 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4013 "vcpus": instance.vcpus,
4016 htkind = self.sstore.GetHypervisorType()
4017 if htkind == constants.HT_XEN_PVM30:
4018 idict["kernel_path"] = instance.kernel_path
4019 idict["initrd_path"] = instance.initrd_path
4021 if htkind == constants.HT_XEN_HVM31:
4022 idict["hvm_boot_order"] = instance.hvm_boot_order
4023 idict["hvm_acpi"] = instance.hvm_acpi
4024 idict["hvm_pae"] = instance.hvm_pae
4025 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4027 if htkind in constants.HTS_REQ_PORT:
4028 idict["vnc_bind_address"] = instance.vnc_bind_address
4029 idict["network_port"] = instance.network_port
4031 result[instance.name] = idict
4036 class LUSetInstanceParams(LogicalUnit):
4037 """Modifies an instances's parameters.
4040 HPATH = "instance-modify"
4041 HTYPE = constants.HTYPE_INSTANCE
4042 _OP_REQP = ["instance_name"]
4044 def BuildHooksEnv(self):
4047 This runs on the master, primary and secondaries.
4052 args['memory'] = self.mem
4054 args['vcpus'] = self.vcpus
4055 if self.do_ip or self.do_bridge or self.mac:
4059 ip = self.instance.nics[0].ip
4061 bridge = self.bridge
4063 bridge = self.instance.nics[0].bridge
4067 mac = self.instance.nics[0].mac
4068 args['nics'] = [(ip, bridge, mac)]
4069 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4070 nl = [self.sstore.GetMasterNode(),
4071 self.instance.primary_node] + list(self.instance.secondary_nodes)
4074 def CheckPrereq(self):
4075 """Check prerequisites.
4077 This only checks the instance list against the existing names.
4080 self.mem = getattr(self.op, "mem", None)
4081 self.vcpus = getattr(self.op, "vcpus", None)
4082 self.ip = getattr(self.op, "ip", None)
4083 self.mac = getattr(self.op, "mac", None)
4084 self.bridge = getattr(self.op, "bridge", None)
4085 self.kernel_path = getattr(self.op, "kernel_path", None)
4086 self.initrd_path = getattr(self.op, "initrd_path", None)
4087 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4088 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4089 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4090 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4091 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4092 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4093 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4094 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4095 self.vnc_bind_address]
4096 if all_parms.count(None) == len(all_parms):
4097 raise errors.OpPrereqError("No changes submitted")
4098 if self.mem is not None:
4100 self.mem = int(self.mem)
4101 except ValueError, err:
4102 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4103 if self.vcpus is not None:
4105 self.vcpus = int(self.vcpus)
4106 except ValueError, err:
4107 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4108 if self.ip is not None:
4110 if self.ip.lower() == "none":
4113 if not utils.IsValidIP(self.ip):
4114 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4117 self.do_bridge = (self.bridge is not None)
4118 if self.mac is not None:
4119 if self.cfg.IsMacInUse(self.mac):
4120 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4122 if not utils.IsValidMac(self.mac):
4123 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4125 if self.kernel_path is not None:
4126 self.do_kernel_path = True
4127 if self.kernel_path == constants.VALUE_NONE:
4128 raise errors.OpPrereqError("Can't set instance to no kernel")
4130 if self.kernel_path != constants.VALUE_DEFAULT:
4131 if not os.path.isabs(self.kernel_path):
4132 raise errors.OpPrereqError("The kernel path must be an absolute"
4135 self.do_kernel_path = False
4137 if self.initrd_path is not None:
4138 self.do_initrd_path = True
4139 if self.initrd_path not in (constants.VALUE_NONE,
4140 constants.VALUE_DEFAULT):
4141 if not os.path.isabs(self.initrd_path):
4142 raise errors.OpPrereqError("The initrd path must be an absolute"
4145 self.do_initrd_path = False
4147 # boot order verification
4148 if self.hvm_boot_order is not None:
4149 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4150 if len(self.hvm_boot_order.strip("acdn")) != 0:
4151 raise errors.OpPrereqError("invalid boot order specified,"
4152 " must be one or more of [acdn]"
4155 # hvm_cdrom_image_path verification
4156 if self.op.hvm_cdrom_image_path is not None:
4157 if not os.path.isabs(self.op.hvm_cdrom_image_path):
4158 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4159 " be an absolute path or None, not %s" %
4160 self.op.hvm_cdrom_image_path)
4161 if not os.path.isfile(self.op.hvm_cdrom_image_path):
4162 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4163 " regular file or a symlink pointing to"
4164 " an existing regular file, not %s" %
4165 self.op.hvm_cdrom_image_path)
4167 # vnc_bind_address verification
4168 if self.op.vnc_bind_address is not None:
4169 if not utils.IsValidIP(self.op.vnc_bind_address):
4170 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4171 " like a valid IP address" %
4172 self.op.vnc_bind_address)
4174 instance = self.cfg.GetInstanceInfo(
4175 self.cfg.ExpandInstanceName(self.op.instance_name))
4176 if instance is None:
4177 raise errors.OpPrereqError("No such instance name '%s'" %
4178 self.op.instance_name)
4179 self.op.instance_name = instance.name
4180 self.instance = instance
4183 def Exec(self, feedback_fn):
4184 """Modifies an instance.
4186 All parameters take effect only at the next restart of the instance.
4189 instance = self.instance
4191 instance.memory = self.mem
4192 result.append(("mem", self.mem))
4194 instance.vcpus = self.vcpus
4195 result.append(("vcpus", self.vcpus))
4197 instance.nics[0].ip = self.ip
4198 result.append(("ip", self.ip))
4200 instance.nics[0].bridge = self.bridge
4201 result.append(("bridge", self.bridge))
4203 instance.nics[0].mac = self.mac
4204 result.append(("mac", self.mac))
4205 if self.do_kernel_path:
4206 instance.kernel_path = self.kernel_path
4207 result.append(("kernel_path", self.kernel_path))
4208 if self.do_initrd_path:
4209 instance.initrd_path = self.initrd_path
4210 result.append(("initrd_path", self.initrd_path))
4211 if self.hvm_boot_order:
4212 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4213 instance.hvm_boot_order = None
4215 instance.hvm_boot_order = self.hvm_boot_order
4216 result.append(("hvm_boot_order", self.hvm_boot_order))
4218 instance.hvm_acpi = self.hvm_acpi
4219 result.append(("hvm_acpi", self.hvm_acpi))
4221 instance.hvm_pae = self.hvm_pae
4222 result.append(("hvm_pae", self.hvm_pae))
4223 if self.hvm_cdrom_image_path:
4224 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4225 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4226 if self.vnc_bind_address:
4227 instance.vnc_bind_address = self.vnc_bind_address
4228 result.append(("vnc_bind_address", self.vnc_bind_address))
4230 self.cfg.AddInstance(instance)
4235 class LUQueryExports(NoHooksLU):
4236 """Query the exports list
4241 def CheckPrereq(self):
4242 """Check that the nodelist contains only existing nodes.
4245 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4247 def Exec(self, feedback_fn):
4248 """Compute the list of all the exported system images.
4251 a dictionary with the structure node->(export-list)
4252 where export-list is a list of the instances exported on
4256 return rpc.call_export_list(self.nodes)
4259 class LUExportInstance(LogicalUnit):
4260 """Export an instance to an image in the cluster.
4263 HPATH = "instance-export"
4264 HTYPE = constants.HTYPE_INSTANCE
4265 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4267 def BuildHooksEnv(self):
4270 This will run on the master, primary node and target node.
4274 "EXPORT_NODE": self.op.target_node,
4275 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4277 env.update(_BuildInstanceHookEnvByObject(self.instance))
4278 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4279 self.op.target_node]
4282 def CheckPrereq(self):
4283 """Check prerequisites.
4285 This checks that the instance and node names are valid.
4288 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4289 self.instance = self.cfg.GetInstanceInfo(instance_name)
4290 if self.instance is None:
4291 raise errors.OpPrereqError("Instance '%s' not found" %
4292 self.op.instance_name)
4295 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4296 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4298 if self.dst_node is None:
4299 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4300 self.op.target_node)
4301 self.op.target_node = self.dst_node.name
4303 # instance disk type verification
4304 for disk in self.instance.disks:
4305 if disk.dev_type == constants.LD_FILE:
4306 raise errors.OpPrereqError("Export not supported for instances with"
4307 " file-based disks")
4309 def Exec(self, feedback_fn):
4310 """Export an instance to an image in the cluster.
4313 instance = self.instance
4314 dst_node = self.dst_node
4315 src_node = instance.primary_node
4316 if self.op.shutdown:
4317 # shutdown the instance, but not the disks
4318 if not rpc.call_instance_shutdown(src_node, instance):
4319 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4320 (instance.name, src_node))
4322 vgname = self.cfg.GetVGName()
4327 for disk in instance.disks:
4328 if disk.iv_name == "sda":
4329 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4330 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4332 if not new_dev_name:
4333 logger.Error("could not snapshot block device %s on node %s" %
4334 (disk.logical_id[1], src_node))
4336 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4337 logical_id=(vgname, new_dev_name),
4338 physical_id=(vgname, new_dev_name),
4339 iv_name=disk.iv_name)
4340 snap_disks.append(new_dev)
4343 if self.op.shutdown and instance.status == "up":
4344 if not rpc.call_instance_start(src_node, instance, None):
4345 _ShutdownInstanceDisks(instance, self.cfg)
4346 raise errors.OpExecError("Could not start instance")
4348 # TODO: check for size
4350 for dev in snap_disks:
4351 if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4352 logger.Error("could not export block device %s from node %s to node %s"
4353 % (dev.logical_id[1], src_node, dst_node.name))
4354 if not rpc.call_blockdev_remove(src_node, dev):
4355 logger.Error("could not remove snapshot block device %s from node %s" %
4356 (dev.logical_id[1], src_node))
4358 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4359 logger.Error("could not finalize export for instance %s on node %s" %
4360 (instance.name, dst_node.name))
4362 nodelist = self.cfg.GetNodeList()
4363 nodelist.remove(dst_node.name)
4365 # on one-node clusters nodelist will be empty after the removal
4366 # if we proceed the backup would be removed because OpQueryExports
4367 # substitutes an empty list with the full cluster node list.
4369 op = opcodes.OpQueryExports(nodes=nodelist)
4370 exportlist = self.proc.ChainOpCode(op)
4371 for node in exportlist:
4372 if instance.name in exportlist[node]:
4373 if not rpc.call_export_remove(node, instance.name):
4374 logger.Error("could not remove older export for instance %s"
4375 " on node %s" % (instance.name, node))
4378 class LURemoveExport(NoHooksLU):
4379 """Remove exports related to the named instance.
4382 _OP_REQP = ["instance_name"]
4384 def CheckPrereq(self):
4385 """Check prerequisites.
4389 def Exec(self, feedback_fn):
4390 """Remove any export.
4393 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4394 # If the instance was not found we'll try with the name that was passed in.
4395 # This will only work if it was an FQDN, though.
4397 if not instance_name:
4399 instance_name = self.op.instance_name
4401 op = opcodes.OpQueryExports(nodes=[])
4402 exportlist = self.proc.ChainOpCode(op)
4404 for node in exportlist:
4405 if instance_name in exportlist[node]:
4407 if not rpc.call_export_remove(node, instance_name):
4408 logger.Error("could not remove export for instance %s"
4409 " on node %s" % (instance_name, node))
4411 if fqdn_warn and not found:
4412 feedback_fn("Export not found. If trying to remove an export belonging"
4413 " to a deleted instance please use its Fully Qualified"
4417 class TagsLU(NoHooksLU):
4420 This is an abstract class which is the parent of all the other tags LUs.
4423 def CheckPrereq(self):
4424 """Check prerequisites.
4427 if self.op.kind == constants.TAG_CLUSTER:
4428 self.target = self.cfg.GetClusterInfo()
4429 elif self.op.kind == constants.TAG_NODE:
4430 name = self.cfg.ExpandNodeName(self.op.name)
4432 raise errors.OpPrereqError("Invalid node name (%s)" %
4435 self.target = self.cfg.GetNodeInfo(name)
4436 elif self.op.kind == constants.TAG_INSTANCE:
4437 name = self.cfg.ExpandInstanceName(self.op.name)
4439 raise errors.OpPrereqError("Invalid instance name (%s)" %
4442 self.target = self.cfg.GetInstanceInfo(name)
4444 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4448 class LUGetTags(TagsLU):
4449 """Returns the tags of a given object.
4452 _OP_REQP = ["kind", "name"]
4454 def Exec(self, feedback_fn):
4455 """Returns the tag list.
4458 return self.target.GetTags()
4461 class LUSearchTags(NoHooksLU):
4462 """Searches the tags for a given pattern.
4465 _OP_REQP = ["pattern"]
4467 def CheckPrereq(self):
4468 """Check prerequisites.
4470 This checks the pattern passed for validity by compiling it.
4474 self.re = re.compile(self.op.pattern)
4475 except re.error, err:
4476 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4477 (self.op.pattern, err))
4479 def Exec(self, feedback_fn):
4480 """Returns the tag list.
4484 tgts = [("/cluster", cfg.GetClusterInfo())]
4485 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4486 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4487 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4488 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4490 for path, target in tgts:
4491 for tag in target.GetTags():
4492 if self.re.search(tag):
4493 results.append((path, tag))
4497 class LUAddTags(TagsLU):
4498 """Sets a tag on a given object.
4501 _OP_REQP = ["kind", "name", "tags"]
4503 def CheckPrereq(self):
4504 """Check prerequisites.
4506 This checks the type and length of the tag name and value.
4509 TagsLU.CheckPrereq(self)
4510 for tag in self.op.tags:
4511 objects.TaggableObject.ValidateTag(tag)
4513 def Exec(self, feedback_fn):
4518 for tag in self.op.tags:
4519 self.target.AddTag(tag)
4520 except errors.TagError, err:
4521 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4523 self.cfg.Update(self.target)
4524 except errors.ConfigurationError:
4525 raise errors.OpRetryError("There has been a modification to the"
4526 " config file and the operation has been"
4527 " aborted. Please retry.")
4530 class LUDelTags(TagsLU):
4531 """Delete a list of tags from a given object.
4534 _OP_REQP = ["kind", "name", "tags"]
4536 def CheckPrereq(self):
4537 """Check prerequisites.
4539 This checks that we have the given tag.
4542 TagsLU.CheckPrereq(self)
4543 for tag in self.op.tags:
4544 objects.TaggableObject.ValidateTag(tag)
4545 del_tags = frozenset(self.op.tags)
4546 cur_tags = self.target.GetTags()
4547 if not del_tags <= cur_tags:
4548 diff_tags = del_tags - cur_tags
4549 diff_names = ["'%s'" % tag for tag in diff_tags]
4551 raise errors.OpPrereqError("Tag(s) %s not found" %
4552 (",".join(diff_names)))
4554 def Exec(self, feedback_fn):
4555 """Remove the tag from the object.
4558 for tag in self.op.tags:
4559 self.target.RemoveTag(tag)
4561 self.cfg.Update(self.target)
4562 except errors.ConfigurationError:
4563 raise errors.OpRetryError("There has been a modification to the"
4564 " config file and the operation has been"
4565 " aborted. Please retry.")
4567 class LUTestDelay(NoHooksLU):
4568 """Sleep for a specified amount of time.
4570 This LU sleeps on the master and/or nodes for a specified amoutn of
4574 _OP_REQP = ["duration", "on_master", "on_nodes"]
4576 def CheckPrereq(self):
4577 """Check prerequisites.
4579 This checks that we have a good list of nodes and/or the duration
4584 if self.op.on_nodes:
4585 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4587 def Exec(self, feedback_fn):
4588 """Do the actual sleep.
4591 if self.op.on_master:
4592 if not utils.TestDelay(self.op.duration):
4593 raise errors.OpExecError("Error during master delay test")
4594 if self.op.on_nodes:
4595 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4597 raise errors.OpExecError("Complete failure from rpc call")
4598 for node, node_result in result.items():
4600 raise errors.OpExecError("Failure during rpc call to node %s,"
4601 " result: %s" % (node, node_result))
4604 class IAllocator(object):
4605 """IAllocator framework.
4607 An IAllocator instance has three sets of attributes:
4608 - cfg/sstore that are needed to query the cluster
4609 - input data (all members of the _KEYS class attribute are required)
4610 - four buffer attributes (in|out_data|text), that represent the
4611 input (to the external script) in text and data structure format,
4612 and the output from it, again in two formats
4613 - the result variables from the script (success, info, nodes) for
4618 "mem_size", "disks", "disk_template",
4619 "os", "tags", "nics", "vcpus",
4625 def __init__(self, cfg, sstore, mode, name, **kwargs):
4627 self.sstore = sstore
4628 # init buffer variables
4629 self.in_text = self.out_text = self.in_data = self.out_data = None
4630 # init all input fields so that pylint is happy
4633 self.mem_size = self.disks = self.disk_template = None
4634 self.os = self.tags = self.nics = self.vcpus = None
4635 self.relocate_from = None
4637 self.required_nodes = None
4638 # init result fields
4639 self.success = self.info = self.nodes = None
4640 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4641 keyset = self._ALLO_KEYS
4642 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4643 keyset = self._RELO_KEYS
4645 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4646 " IAllocator" % self.mode)
4648 if key not in keyset:
4649 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4650 " IAllocator" % key)
4651 setattr(self, key, kwargs[key])
4653 if key not in kwargs:
4654 raise errors.ProgrammerError("Missing input parameter '%s' to"
4655 " IAllocator" % key)
4656 self._BuildInputData()
4658 def _ComputeClusterData(self):
4659 """Compute the generic allocator input data.
4661 This is the data that is independent of the actual operation.
4668 "cluster_name": self.sstore.GetClusterName(),
4669 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4670 "hypervisor_type": self.sstore.GetHypervisorType(),
4671 # we don't have job IDs
4674 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4678 node_list = cfg.GetNodeList()
4679 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4680 for nname in node_list:
4681 ninfo = cfg.GetNodeInfo(nname)
4682 if nname not in node_data or not isinstance(node_data[nname], dict):
4683 raise errors.OpExecError("Can't get data for node %s" % nname)
4684 remote_info = node_data[nname]
4685 for attr in ['memory_total', 'memory_free', 'memory_dom0',
4686 'vg_size', 'vg_free', 'cpu_total']:
4687 if attr not in remote_info:
4688 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4691 remote_info[attr] = int(remote_info[attr])
4692 except ValueError, err:
4693 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4694 " %s" % (nname, attr, str(err)))
4695 # compute memory used by primary instances
4696 i_p_mem = i_p_up_mem = 0
4697 for iinfo in i_list:
4698 if iinfo.primary_node == nname:
4699 i_p_mem += iinfo.memory
4700 if iinfo.status == "up":
4701 i_p_up_mem += iinfo.memory
4703 # compute memory used by instances
4705 "tags": list(ninfo.GetTags()),
4706 "total_memory": remote_info['memory_total'],
4707 "reserved_memory": remote_info['memory_dom0'],
4708 "free_memory": remote_info['memory_free'],
4709 "i_pri_memory": i_p_mem,
4710 "i_pri_up_memory": i_p_up_mem,
4711 "total_disk": remote_info['vg_size'],
4712 "free_disk": remote_info['vg_free'],
4713 "primary_ip": ninfo.primary_ip,
4714 "secondary_ip": ninfo.secondary_ip,
4715 "total_cpus": remote_info['cpu_total'],
4717 node_results[nname] = pnr
4718 data["nodes"] = node_results
4722 for iinfo in i_list:
4723 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4724 for n in iinfo.nics]
4726 "tags": list(iinfo.GetTags()),
4727 "should_run": iinfo.status == "up",
4728 "vcpus": iinfo.vcpus,
4729 "memory": iinfo.memory,
4731 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4733 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4734 "disk_template": iinfo.disk_template,
4736 instance_data[iinfo.name] = pir
4738 data["instances"] = instance_data
4742 def _AddNewInstance(self):
4743 """Add new instance data to allocator structure.
4745 This in combination with _AllocatorGetClusterData will create the
4746 correct structure needed as input for the allocator.
4748 The checks for the completeness of the opcode must have already been
4753 if len(self.disks) != 2:
4754 raise errors.OpExecError("Only two-disk configurations supported")
4756 disk_space = _ComputeDiskSize(self.disk_template,
4757 self.disks[0]["size"], self.disks[1]["size"])
4759 if self.disk_template in constants.DTS_NET_MIRROR:
4760 self.required_nodes = 2
4762 self.required_nodes = 1
4766 "disk_template": self.disk_template,
4769 "vcpus": self.vcpus,
4770 "memory": self.mem_size,
4771 "disks": self.disks,
4772 "disk_space_total": disk_space,
4774 "required_nodes": self.required_nodes,
4776 data["request"] = request
4778 def _AddRelocateInstance(self):
4779 """Add relocate instance data to allocator structure.
4781 This in combination with _IAllocatorGetClusterData will create the
4782 correct structure needed as input for the allocator.
4784 The checks for the completeness of the opcode must have already been
4788 instance = self.cfg.GetInstanceInfo(self.name)
4789 if instance is None:
4790 raise errors.ProgrammerError("Unknown instance '%s' passed to"
4791 " IAllocator" % self.name)
4793 if instance.disk_template not in constants.DTS_NET_MIRROR:
4794 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4796 if len(instance.secondary_nodes) != 1:
4797 raise errors.OpPrereqError("Instance has not exactly one secondary node")
4799 self.required_nodes = 1
4801 disk_space = _ComputeDiskSize(instance.disk_template,
4802 instance.disks[0].size,
4803 instance.disks[1].size)
4808 "disk_space_total": disk_space,
4809 "required_nodes": self.required_nodes,
4810 "relocate_from": self.relocate_from,
4812 self.in_data["request"] = request
4814 def _BuildInputData(self):
4815 """Build input data structures.
4818 self._ComputeClusterData()
4820 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4821 self._AddNewInstance()
4823 self._AddRelocateInstance()
4825 self.in_text = serializer.Dump(self.in_data)
4827 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4828 """Run an instance allocator and return the results.
4833 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4835 if not isinstance(result, tuple) or len(result) != 4:
4836 raise errors.OpExecError("Invalid result from master iallocator runner")
4838 rcode, stdout, stderr, fail = result
4840 if rcode == constants.IARUN_NOTFOUND:
4841 raise errors.OpExecError("Can't find allocator '%s'" % name)
4842 elif rcode == constants.IARUN_FAILURE:
4843 raise errors.OpExecError("Instance allocator call failed: %s,"
4845 (fail, stdout+stderr))
4846 self.out_text = stdout
4848 self._ValidateResult()
4850 def _ValidateResult(self):
4851 """Process the allocator results.
4853 This will process and if successful save the result in
4854 self.out_data and the other parameters.
4858 rdict = serializer.Load(self.out_text)
4859 except Exception, err:
4860 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4862 if not isinstance(rdict, dict):
4863 raise errors.OpExecError("Can't parse iallocator results: not a dict")
4865 for key in "success", "info", "nodes":
4866 if key not in rdict:
4867 raise errors.OpExecError("Can't parse iallocator results:"
4868 " missing key '%s'" % key)
4869 setattr(self, key, rdict[key])
4871 if not isinstance(rdict["nodes"], list):
4872 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4874 self.out_data = rdict
4877 class LUTestAllocator(NoHooksLU):
4878 """Run allocator tests.
4880 This LU runs the allocator tests
4883 _OP_REQP = ["direction", "mode", "name"]
4885 def CheckPrereq(self):
4886 """Check prerequisites.
4888 This checks the opcode parameters depending on the director and mode test.
4891 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4892 for attr in ["name", "mem_size", "disks", "disk_template",
4893 "os", "tags", "nics", "vcpus"]:
4894 if not hasattr(self.op, attr):
4895 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4897 iname = self.cfg.ExpandInstanceName(self.op.name)
4898 if iname is not None:
4899 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4901 if not isinstance(self.op.nics, list):
4902 raise errors.OpPrereqError("Invalid parameter 'nics'")
4903 for row in self.op.nics:
4904 if (not isinstance(row, dict) or
4907 "bridge" not in row):
4908 raise errors.OpPrereqError("Invalid contents of the"
4909 " 'nics' parameter")
4910 if not isinstance(self.op.disks, list):
4911 raise errors.OpPrereqError("Invalid parameter 'disks'")
4912 if len(self.op.disks) != 2:
4913 raise errors.OpPrereqError("Only two-disk configurations supported")
4914 for row in self.op.disks:
4915 if (not isinstance(row, dict) or
4916 "size" not in row or
4917 not isinstance(row["size"], int) or
4918 "mode" not in row or
4919 row["mode"] not in ['r', 'w']):
4920 raise errors.OpPrereqError("Invalid contents of the"
4921 " 'disks' parameter")
4922 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4923 if not hasattr(self.op, "name"):
4924 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4925 fname = self.cfg.ExpandInstanceName(self.op.name)
4927 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4929 self.op.name = fname
4930 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4932 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
4935 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
4936 if not hasattr(self.op, "allocator") or self.op.allocator is None:
4937 raise errors.OpPrereqError("Missing allocator name")
4938 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
4939 raise errors.OpPrereqError("Wrong allocator test '%s'" %
4942 def Exec(self, feedback_fn):
4943 """Run the allocator test.
4946 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4947 ial = IAllocator(self.cfg, self.sstore,
4950 mem_size=self.op.mem_size,
4951 disks=self.op.disks,
4952 disk_template=self.op.disk_template,
4956 vcpus=self.op.vcpus,
4959 ial = IAllocator(self.cfg, self.sstore,
4962 relocate_from=list(self.relocate_from),
4965 if self.op.direction == constants.IALLOCATOR_DIR_IN:
4966 result = ial.in_text
4968 ial.Run(self.op.allocator, validate=False)
4969 result = ial.out_text