4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46 from ganeti import serializer
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_MASTER: the LU needs to run on the master node
60 REQ_WSSTORE: the LU needs a writable SimpleStore
61 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
63 Note that all commands require root permissions.
73 def __init__(self, processor, op, context, sstore):
74 """Constructor for LogicalUnit.
76 This needs to be overriden in derived classes in order to check op
82 self.cfg = context.cfg
84 self.context = context
85 self.needed_locks = None
88 for attr_name in self._OP_REQP:
89 attr_val = getattr(op, attr_name, None)
91 raise errors.OpPrereqError("Required parameter '%s' missing" %
94 if not self.cfg.IsCluster():
95 raise errors.OpPrereqError("Cluster not initialized yet,"
96 " use 'gnt-cluster init' first.")
98 master = sstore.GetMasterNode()
99 if master != utils.HostInfo().name:
100 raise errors.OpPrereqError("Commands must be run on the master"
104 """Returns the SshRunner object
108 self.__ssh = ssh.SshRunner(self.sstore)
111 ssh = property(fget=__GetSSH)
113 def ExpandNames(self):
114 """Expand names for this LU.
116 This method is called before starting to execute the opcode, and it should
117 update all the parameters of the opcode to their canonical form (e.g. a
118 short node name must be fully expanded after this method has successfully
119 completed). This way locking, hooks, logging, ecc. can work correctly.
121 LUs which implement this method must also populate the self.needed_locks
122 member, as a dict with lock levels as keys, and a list of needed lock names
124 - Use an empty dict if you don't need any lock
125 - If you don't need any lock at a particular level omit that level
126 - Don't put anything for the BGL level
127 - If you want all locks at a level use None as a value
128 (this reflects what LockSet does, and will be replaced before
129 CheckPrereq with the full list of nodes that have been locked)
132 # Acquire all nodes and one instance
133 self.needed_locks = {
134 locking.LEVEL_NODE: None,
135 locking.LEVEL_INSTANCES: ['instance1.example.tld'],
137 # Acquire just two nodes
138 self.needed_locks = {
139 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
142 self.needed_locks = {} # No, you can't leave it to the default value None
145 # The implementation of this method is mandatory only if the new LU is
146 # concurrent, so that old LUs don't need to be changed all at the same
149 self.needed_locks = {} # Exclusive LUs don't need locks.
151 raise NotImplementedError
153 def CheckPrereq(self):
154 """Check prerequisites for this LU.
156 This method should check that the prerequisites for the execution
157 of this LU are fulfilled. It can do internode communication, but
158 it should be idempotent - no cluster or system changes are
161 The method should raise errors.OpPrereqError in case something is
162 not fulfilled. Its return value is ignored.
164 This method should also update all the parameters of the opcode to
165 their canonical form if it hasn't been done by ExpandNames before.
168 raise NotImplementedError
170 def Exec(self, feedback_fn):
173 This method should implement the actual work. It should raise
174 errors.OpExecError for failures that are somewhat dealt with in
178 raise NotImplementedError
180 def BuildHooksEnv(self):
181 """Build hooks environment for this LU.
183 This method should return a three-node tuple consisting of: a dict
184 containing the environment that will be used for running the
185 specific hook for this LU, a list of node names on which the hook
186 should run before the execution, and a list of node names on which
187 the hook should run after the execution.
189 The keys of the dict must not have 'GANETI_' prefixed as this will
190 be handled in the hooks runner. Also note additional keys will be
191 added by the hooks runner. If the LU doesn't define any
192 environment, an empty dict (and not None) should be returned.
194 No nodes should be returned as an empty list (and not None).
196 Note that if the HPATH for a LU class is None, this function will
200 raise NotImplementedError
202 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
203 """Notify the LU about the results of its hooks.
205 This method is called every time a hooks phase is executed, and notifies
206 the Logical Unit about the hooks' result. The LU can then use it to alter
207 its result based on the hooks. By default the method does nothing and the
208 previous result is passed back unchanged but any LU can define it if it
209 wants to use the local cluster hook-scripts somehow.
212 phase: the hooks phase that has just been run
213 hooks_results: the results of the multi-node hooks rpc call
214 feedback_fn: function to send feedback back to the caller
215 lu_result: the previous result this LU had, or None in the PRE phase.
220 def _ExpandAndLockInstance(self):
221 """Helper function to expand and lock an instance.
223 Many LUs that work on an instance take its name in self.op.instance_name
224 and need to expand it and then declare the expanded name for locking. This
225 function does it, and then updates self.op.instance_name to the expanded
226 name. It also initializes needed_locks as a dict, if this hasn't been done
230 if self.needed_locks is None:
231 self.needed_locks = {}
233 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
234 "_ExpandAndLockInstance called with instance-level locks set"
235 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
236 if expanded_name is None:
237 raise errors.OpPrereqError("Instance '%s' not known" %
238 self.op.instance_name)
239 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
240 self.op.instance_name = expanded_name
243 class NoHooksLU(LogicalUnit):
244 """Simple LU which runs no hooks.
246 This LU is intended as a parent for other LogicalUnits which will
247 run no hooks, in order to reduce duplicate code.
254 def _GetWantedNodes(lu, nodes):
255 """Returns list of checked and expanded node names.
258 nodes: List of nodes (strings) or None for all
261 if not isinstance(nodes, list):
262 raise errors.OpPrereqError("Invalid argument type 'nodes'")
268 node = lu.cfg.ExpandNodeName(name)
270 raise errors.OpPrereqError("No such node name '%s'" % name)
274 wanted = lu.cfg.GetNodeList()
275 return utils.NiceSort(wanted)
278 def _GetWantedInstances(lu, instances):
279 """Returns list of checked and expanded instance names.
282 instances: List of instances (strings) or None for all
285 if not isinstance(instances, list):
286 raise errors.OpPrereqError("Invalid argument type 'instances'")
291 for name in instances:
292 instance = lu.cfg.ExpandInstanceName(name)
294 raise errors.OpPrereqError("No such instance name '%s'" % name)
295 wanted.append(instance)
298 wanted = lu.cfg.GetInstanceList()
299 return utils.NiceSort(wanted)
302 def _CheckOutputFields(static, dynamic, selected):
303 """Checks whether all selected fields are valid.
306 static: Static fields
307 dynamic: Dynamic fields
310 static_fields = frozenset(static)
311 dynamic_fields = frozenset(dynamic)
313 all_fields = static_fields | dynamic_fields
315 if not all_fields.issuperset(selected):
316 raise errors.OpPrereqError("Unknown output fields selected: %s"
317 % ",".join(frozenset(selected).
318 difference(all_fields)))
321 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
322 memory, vcpus, nics):
323 """Builds instance related env variables for hooks from single variables.
326 secondary_nodes: List of secondary nodes as strings
330 "INSTANCE_NAME": name,
331 "INSTANCE_PRIMARY": primary_node,
332 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
333 "INSTANCE_OS_TYPE": os_type,
334 "INSTANCE_STATUS": status,
335 "INSTANCE_MEMORY": memory,
336 "INSTANCE_VCPUS": vcpus,
340 nic_count = len(nics)
341 for idx, (ip, bridge, mac) in enumerate(nics):
344 env["INSTANCE_NIC%d_IP" % idx] = ip
345 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
346 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
350 env["INSTANCE_NIC_COUNT"] = nic_count
355 def _BuildInstanceHookEnvByObject(instance, override=None):
356 """Builds instance related env variables for hooks from an object.
359 instance: objects.Instance object of instance
360 override: dict of values to override
363 'name': instance.name,
364 'primary_node': instance.primary_node,
365 'secondary_nodes': instance.secondary_nodes,
366 'os_type': instance.os,
367 'status': instance.os,
368 'memory': instance.memory,
369 'vcpus': instance.vcpus,
370 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
373 args.update(override)
374 return _BuildInstanceHookEnv(**args)
377 def _CheckInstanceBridgesExist(instance):
378 """Check that the brigdes needed by an instance exist.
381 # check bridges existance
382 brlist = [nic.bridge for nic in instance.nics]
383 if not rpc.call_bridges_exist(instance.primary_node, brlist):
384 raise errors.OpPrereqError("one or more target bridges %s does not"
385 " exist on destination node '%s'" %
386 (brlist, instance.primary_node))
389 class LUDestroyCluster(NoHooksLU):
390 """Logical unit for destroying the cluster.
395 def CheckPrereq(self):
396 """Check prerequisites.
398 This checks whether the cluster is empty.
400 Any errors are signalled by raising errors.OpPrereqError.
403 master = self.sstore.GetMasterNode()
405 nodelist = self.cfg.GetNodeList()
406 if len(nodelist) != 1 or nodelist[0] != master:
407 raise errors.OpPrereqError("There are still %d node(s) in"
408 " this cluster." % (len(nodelist) - 1))
409 instancelist = self.cfg.GetInstanceList()
411 raise errors.OpPrereqError("There are still %d instance(s) in"
412 " this cluster." % len(instancelist))
414 def Exec(self, feedback_fn):
415 """Destroys the cluster.
418 master = self.sstore.GetMasterNode()
419 if not rpc.call_node_stop_master(master):
420 raise errors.OpExecError("Could not disable the master role")
421 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
422 utils.CreateBackup(priv_key)
423 utils.CreateBackup(pub_key)
424 rpc.call_node_leave_cluster(master)
427 class LUVerifyCluster(LogicalUnit):
428 """Verifies the cluster status.
431 HPATH = "cluster-verify"
432 HTYPE = constants.HTYPE_CLUSTER
433 _OP_REQP = ["skip_checks"]
435 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
436 remote_version, feedback_fn):
437 """Run multiple tests against a node.
440 - compares ganeti version
441 - checks vg existance and size > 20G
442 - checks config file checksum
443 - checks ssh to other nodes
446 node: name of the node to check
447 file_list: required list of files
448 local_cksum: dictionary of local files and their checksums
451 # compares ganeti version
452 local_version = constants.PROTOCOL_VERSION
453 if not remote_version:
454 feedback_fn(" - ERROR: connection to %s failed" % (node))
457 if local_version != remote_version:
458 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
459 (local_version, node, remote_version))
462 # checks vg existance and size > 20G
466 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
470 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
471 constants.MIN_VG_SIZE)
473 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
476 # checks config file checksum
479 if 'filelist' not in node_result:
481 feedback_fn(" - ERROR: node hasn't returned file checksum data")
483 remote_cksum = node_result['filelist']
484 for file_name in file_list:
485 if file_name not in remote_cksum:
487 feedback_fn(" - ERROR: file '%s' missing" % file_name)
488 elif remote_cksum[file_name] != local_cksum[file_name]:
490 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
492 if 'nodelist' not in node_result:
494 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
496 if node_result['nodelist']:
498 for node in node_result['nodelist']:
499 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
500 (node, node_result['nodelist'][node]))
501 if 'node-net-test' not in node_result:
503 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
505 if node_result['node-net-test']:
507 nlist = utils.NiceSort(node_result['node-net-test'].keys())
509 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
510 (node, node_result['node-net-test'][node]))
512 hyp_result = node_result.get('hypervisor', None)
513 if hyp_result is not None:
514 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
517 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
518 node_instance, feedback_fn):
519 """Verify an instance.
521 This function checks to see if the required block devices are
522 available on the instance's node.
527 node_current = instanceconfig.primary_node
530 instanceconfig.MapLVsByNode(node_vol_should)
532 for node in node_vol_should:
533 for volume in node_vol_should[node]:
534 if node not in node_vol_is or volume not in node_vol_is[node]:
535 feedback_fn(" - ERROR: volume %s missing on node %s" %
539 if not instanceconfig.status == 'down':
540 if (node_current not in node_instance or
541 not instance in node_instance[node_current]):
542 feedback_fn(" - ERROR: instance %s not running on node %s" %
543 (instance, node_current))
546 for node in node_instance:
547 if (not node == node_current):
548 if instance in node_instance[node]:
549 feedback_fn(" - ERROR: instance %s should not run on node %s" %
555 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
556 """Verify if there are any unknown volumes in the cluster.
558 The .os, .swap and backup volumes are ignored. All other volumes are
564 for node in node_vol_is:
565 for volume in node_vol_is[node]:
566 if node not in node_vol_should or volume not in node_vol_should[node]:
567 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
572 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
573 """Verify the list of running instances.
575 This checks what instances are running but unknown to the cluster.
579 for node in node_instance:
580 for runninginstance in node_instance[node]:
581 if runninginstance not in instancelist:
582 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
583 (runninginstance, node))
587 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
588 """Verify N+1 Memory Resilience.
590 Check that if one single node dies we can still start all the instances it
596 for node, nodeinfo in node_info.iteritems():
597 # This code checks that every node which is now listed as secondary has
598 # enough memory to host all instances it is supposed to should a single
599 # other node in the cluster fail.
600 # FIXME: not ready for failover to an arbitrary node
601 # FIXME: does not support file-backed instances
602 # WARNING: we currently take into account down instances as well as up
603 # ones, considering that even if they're down someone might want to start
604 # them even in the event of a node failure.
605 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
607 for instance in instances:
608 needed_mem += instance_cfg[instance].memory
609 if nodeinfo['mfree'] < needed_mem:
610 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
611 " failovers should node %s fail" % (node, prinode))
615 def CheckPrereq(self):
616 """Check prerequisites.
618 Transform the list of checks we're going to skip into a set and check that
619 all its members are valid.
622 self.skip_set = frozenset(self.op.skip_checks)
623 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
624 raise errors.OpPrereqError("Invalid checks to be skipped specified")
626 def BuildHooksEnv(self):
629 Cluster-Verify hooks just rone in the post phase and their failure makes
630 the output be logged in the verify output and the verification to fail.
633 all_nodes = self.cfg.GetNodeList()
634 # TODO: populate the environment with useful information for verify hooks
636 return env, [], all_nodes
638 def Exec(self, feedback_fn):
639 """Verify integrity of cluster, performing various test on nodes.
643 feedback_fn("* Verifying global settings")
644 for msg in self.cfg.VerifyConfig():
645 feedback_fn(" - ERROR: %s" % msg)
647 vg_name = self.cfg.GetVGName()
648 nodelist = utils.NiceSort(self.cfg.GetNodeList())
649 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
650 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
651 i_non_redundant = [] # Non redundant instances
657 # FIXME: verify OS list
659 file_names = list(self.sstore.GetFileList())
660 file_names.append(constants.SSL_CERT_FILE)
661 file_names.append(constants.CLUSTER_CONF_FILE)
662 local_checksums = utils.FingerprintFiles(file_names)
664 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
665 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
666 all_instanceinfo = rpc.call_instance_list(nodelist)
667 all_vglist = rpc.call_vg_list(nodelist)
668 node_verify_param = {
669 'filelist': file_names,
670 'nodelist': nodelist,
672 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
673 for node in nodeinfo]
675 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
676 all_rversion = rpc.call_version(nodelist)
677 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
679 for node in nodelist:
680 feedback_fn("* Verifying node %s" % node)
681 result = self._VerifyNode(node, file_names, local_checksums,
682 all_vglist[node], all_nvinfo[node],
683 all_rversion[node], feedback_fn)
687 volumeinfo = all_volumeinfo[node]
689 if isinstance(volumeinfo, basestring):
690 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
691 (node, volumeinfo[-400:].encode('string_escape')))
693 node_volume[node] = {}
694 elif not isinstance(volumeinfo, dict):
695 feedback_fn(" - ERROR: connection to %s failed" % (node,))
699 node_volume[node] = volumeinfo
702 nodeinstance = all_instanceinfo[node]
703 if type(nodeinstance) != list:
704 feedback_fn(" - ERROR: connection to %s failed" % (node,))
708 node_instance[node] = nodeinstance
711 nodeinfo = all_ninfo[node]
712 if not isinstance(nodeinfo, dict):
713 feedback_fn(" - ERROR: connection to %s failed" % (node,))
719 "mfree": int(nodeinfo['memory_free']),
720 "dfree": int(nodeinfo['vg_free']),
723 # dictionary holding all instances this node is secondary for,
724 # grouped by their primary node. Each key is a cluster node, and each
725 # value is a list of instances which have the key as primary and the
726 # current node as secondary. this is handy to calculate N+1 memory
727 # availability if you can only failover from a primary to its
729 "sinst-by-pnode": {},
732 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
738 for instance in instancelist:
739 feedback_fn("* Verifying instance %s" % instance)
740 inst_config = self.cfg.GetInstanceInfo(instance)
741 result = self._VerifyInstance(instance, inst_config, node_volume,
742 node_instance, feedback_fn)
745 inst_config.MapLVsByNode(node_vol_should)
747 instance_cfg[instance] = inst_config
749 pnode = inst_config.primary_node
750 if pnode in node_info:
751 node_info[pnode]['pinst'].append(instance)
753 feedback_fn(" - ERROR: instance %s, connection to primary node"
754 " %s failed" % (instance, pnode))
757 # If the instance is non-redundant we cannot survive losing its primary
758 # node, so we are not N+1 compliant. On the other hand we have no disk
759 # templates with more than one secondary so that situation is not well
761 # FIXME: does not support file-backed instances
762 if len(inst_config.secondary_nodes) == 0:
763 i_non_redundant.append(instance)
764 elif len(inst_config.secondary_nodes) > 1:
765 feedback_fn(" - WARNING: multiple secondaries for instance %s"
768 for snode in inst_config.secondary_nodes:
769 if snode in node_info:
770 node_info[snode]['sinst'].append(instance)
771 if pnode not in node_info[snode]['sinst-by-pnode']:
772 node_info[snode]['sinst-by-pnode'][pnode] = []
773 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
775 feedback_fn(" - ERROR: instance %s, connection to secondary node"
776 " %s failed" % (instance, snode))
778 feedback_fn("* Verifying orphan volumes")
779 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
783 feedback_fn("* Verifying remaining instances")
784 result = self._VerifyOrphanInstances(instancelist, node_instance,
788 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
789 feedback_fn("* Verifying N+1 Memory redundancy")
790 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
793 feedback_fn("* Other Notes")
795 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
796 % len(i_non_redundant))
800 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
801 """Analize the post-hooks' result, handle it, and send some
802 nicely-formatted feedback back to the user.
805 phase: the hooks phase that has just been run
806 hooks_results: the results of the multi-node hooks rpc call
807 feedback_fn: function to send feedback back to the caller
808 lu_result: previous Exec result
811 # We only really run POST phase hooks, and are only interested in their results
812 if phase == constants.HOOKS_PHASE_POST:
813 # Used to change hooks' output to proper indentation
814 indent_re = re.compile('^', re.M)
815 feedback_fn("* Hooks Results")
816 if not hooks_results:
817 feedback_fn(" - ERROR: general communication failure")
820 for node_name in hooks_results:
821 show_node_header = True
822 res = hooks_results[node_name]
823 if res is False or not isinstance(res, list):
824 feedback_fn(" Communication failure")
827 for script, hkr, output in res:
828 if hkr == constants.HKR_FAIL:
829 # The node header is only shown once, if there are
830 # failing hooks on that node
832 feedback_fn(" Node %s:" % node_name)
833 show_node_header = False
834 feedback_fn(" ERROR: Script %s failed, output:" % script)
835 output = indent_re.sub(' ', output)
836 feedback_fn("%s" % output)
842 class LUVerifyDisks(NoHooksLU):
843 """Verifies the cluster disks status.
848 def CheckPrereq(self):
849 """Check prerequisites.
851 This has no prerequisites.
856 def Exec(self, feedback_fn):
857 """Verify integrity of cluster disks.
860 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
862 vg_name = self.cfg.GetVGName()
863 nodes = utils.NiceSort(self.cfg.GetNodeList())
864 instances = [self.cfg.GetInstanceInfo(name)
865 for name in self.cfg.GetInstanceList()]
868 for inst in instances:
870 if (inst.status != "up" or
871 inst.disk_template not in constants.DTS_NET_MIRROR):
873 inst.MapLVsByNode(inst_lvs)
874 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
875 for node, vol_list in inst_lvs.iteritems():
877 nv_dict[(node, vol)] = inst
882 node_lvs = rpc.call_volume_list(nodes, vg_name)
889 if isinstance(lvs, basestring):
890 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
892 elif not isinstance(lvs, dict):
893 logger.Info("connection to node %s failed or invalid data returned" %
895 res_nodes.append(node)
898 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
899 inst = nv_dict.pop((node, lv_name), None)
900 if (not lv_online and inst is not None
901 and inst.name not in res_instances):
902 res_instances.append(inst.name)
904 # any leftover items in nv_dict are missing LVs, let's arrange the
906 for key, inst in nv_dict.iteritems():
907 if inst.name not in res_missing:
908 res_missing[inst.name] = []
909 res_missing[inst.name].append(key)
914 class LURenameCluster(LogicalUnit):
915 """Rename the cluster.
918 HPATH = "cluster-rename"
919 HTYPE = constants.HTYPE_CLUSTER
923 def BuildHooksEnv(self):
928 "OP_TARGET": self.sstore.GetClusterName(),
929 "NEW_NAME": self.op.name,
931 mn = self.sstore.GetMasterNode()
932 return env, [mn], [mn]
934 def CheckPrereq(self):
935 """Verify that the passed name is a valid one.
938 hostname = utils.HostInfo(self.op.name)
940 new_name = hostname.name
941 self.ip = new_ip = hostname.ip
942 old_name = self.sstore.GetClusterName()
943 old_ip = self.sstore.GetMasterIP()
944 if new_name == old_name and new_ip == old_ip:
945 raise errors.OpPrereqError("Neither the name nor the IP address of the"
946 " cluster has changed")
948 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
949 raise errors.OpPrereqError("The given cluster IP address (%s) is"
950 " reachable on the network. Aborting." %
953 self.op.name = new_name
955 def Exec(self, feedback_fn):
956 """Rename the cluster.
959 clustername = self.op.name
963 # shutdown the master IP
964 master = ss.GetMasterNode()
965 if not rpc.call_node_stop_master(master):
966 raise errors.OpExecError("Could not disable the master role")
970 ss.SetKey(ss.SS_MASTER_IP, ip)
971 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
973 # Distribute updated ss config to all nodes
974 myself = self.cfg.GetNodeInfo(master)
975 dist_nodes = self.cfg.GetNodeList()
976 if myself.name in dist_nodes:
977 dist_nodes.remove(myself.name)
979 logger.Debug("Copying updated ssconf data to all nodes")
980 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
981 fname = ss.KeyToFilename(keyname)
982 result = rpc.call_upload_file(dist_nodes, fname)
983 for to_node in dist_nodes:
984 if not result[to_node]:
985 logger.Error("copy of file %s to node %s failed" %
988 if not rpc.call_node_start_master(master):
989 logger.Error("Could not re-enable the master role on the master,"
990 " please restart manually.")
993 def _RecursiveCheckIfLVMBased(disk):
994 """Check if the given disk or its children are lvm-based.
997 disk: ganeti.objects.Disk object
1000 boolean indicating whether a LD_LV dev_type was found or not
1004 for chdisk in disk.children:
1005 if _RecursiveCheckIfLVMBased(chdisk):
1007 return disk.dev_type == constants.LD_LV
1010 class LUSetClusterParams(LogicalUnit):
1011 """Change the parameters of the cluster.
1014 HPATH = "cluster-modify"
1015 HTYPE = constants.HTYPE_CLUSTER
1018 def BuildHooksEnv(self):
1023 "OP_TARGET": self.sstore.GetClusterName(),
1024 "NEW_VG_NAME": self.op.vg_name,
1026 mn = self.sstore.GetMasterNode()
1027 return env, [mn], [mn]
1029 def CheckPrereq(self):
1030 """Check prerequisites.
1032 This checks whether the given params don't conflict and
1033 if the given volume group is valid.
1036 if not self.op.vg_name:
1037 instances = [self.cfg.GetInstanceInfo(name)
1038 for name in self.cfg.GetInstanceList()]
1039 for inst in instances:
1040 for disk in inst.disks:
1041 if _RecursiveCheckIfLVMBased(disk):
1042 raise errors.OpPrereqError("Cannot disable lvm storage while"
1043 " lvm-based instances exist")
1045 # if vg_name not None, checks given volume group on all nodes
1047 node_list = self.cfg.GetNodeList()
1048 vglist = rpc.call_vg_list(node_list)
1049 for node in node_list:
1050 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1051 constants.MIN_VG_SIZE)
1053 raise errors.OpPrereqError("Error on node '%s': %s" %
1056 def Exec(self, feedback_fn):
1057 """Change the parameters of the cluster.
1060 if self.op.vg_name != self.cfg.GetVGName():
1061 self.cfg.SetVGName(self.op.vg_name)
1063 feedback_fn("Cluster LVM configuration already in desired"
1064 " state, not changing")
1067 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1068 """Sleep and poll for an instance's disk to sync.
1071 if not instance.disks:
1075 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1077 node = instance.primary_node
1079 for dev in instance.disks:
1080 cfgw.SetDiskID(dev, node)
1086 cumul_degraded = False
1087 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1089 proc.LogWarning("Can't get any data from node %s" % node)
1092 raise errors.RemoteError("Can't contact node %s for mirror data,"
1093 " aborting." % node)
1097 for i in range(len(rstats)):
1100 proc.LogWarning("Can't compute data for node %s/%s" %
1101 (node, instance.disks[i].iv_name))
1103 # we ignore the ldisk parameter
1104 perc_done, est_time, is_degraded, _ = mstat
1105 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1106 if perc_done is not None:
1108 if est_time is not None:
1109 rem_time = "%d estimated seconds remaining" % est_time
1112 rem_time = "no time estimate"
1113 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1114 (instance.disks[i].iv_name, perc_done, rem_time))
1118 time.sleep(min(60, max_time))
1121 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1122 return not cumul_degraded
1125 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1126 """Check that mirrors are not degraded.
1128 The ldisk parameter, if True, will change the test from the
1129 is_degraded attribute (which represents overall non-ok status for
1130 the device(s)) to the ldisk (representing the local storage status).
1133 cfgw.SetDiskID(dev, node)
1140 if on_primary or dev.AssembleOnSecondary():
1141 rstats = rpc.call_blockdev_find(node, dev)
1143 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1146 result = result and (not rstats[idx])
1148 for child in dev.children:
1149 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1154 class LUDiagnoseOS(NoHooksLU):
1155 """Logical unit for OS diagnose/query.
1158 _OP_REQP = ["output_fields", "names"]
1160 def CheckPrereq(self):
1161 """Check prerequisites.
1163 This always succeeds, since this is a pure query LU.
1167 raise errors.OpPrereqError("Selective OS query not supported")
1169 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1170 _CheckOutputFields(static=[],
1171 dynamic=self.dynamic_fields,
1172 selected=self.op.output_fields)
1175 def _DiagnoseByOS(node_list, rlist):
1176 """Remaps a per-node return list into an a per-os per-node dictionary
1179 node_list: a list with the names of all nodes
1180 rlist: a map with node names as keys and OS objects as values
1183 map: a map with osnames as keys and as value another map, with
1185 keys and list of OS objects as values
1186 e.g. {"debian-etch": {"node1": [<object>,...],
1187 "node2": [<object>,]}
1192 for node_name, nr in rlist.iteritems():
1196 if os_obj.name not in all_os:
1197 # build a list of nodes for this os containing empty lists
1198 # for each node in node_list
1199 all_os[os_obj.name] = {}
1200 for nname in node_list:
1201 all_os[os_obj.name][nname] = []
1202 all_os[os_obj.name][node_name].append(os_obj)
1205 def Exec(self, feedback_fn):
1206 """Compute the list of OSes.
1209 node_list = self.cfg.GetNodeList()
1210 node_data = rpc.call_os_diagnose(node_list)
1211 if node_data == False:
1212 raise errors.OpExecError("Can't gather the list of OSes")
1213 pol = self._DiagnoseByOS(node_list, node_data)
1215 for os_name, os_data in pol.iteritems():
1217 for field in self.op.output_fields:
1220 elif field == "valid":
1221 val = utils.all([osl and osl[0] for osl in os_data.values()])
1222 elif field == "node_status":
1224 for node_name, nos_list in os_data.iteritems():
1225 val[node_name] = [(v.status, v.path) for v in nos_list]
1227 raise errors.ParameterError(field)
1234 class LURemoveNode(LogicalUnit):
1235 """Logical unit for removing a node.
1238 HPATH = "node-remove"
1239 HTYPE = constants.HTYPE_NODE
1240 _OP_REQP = ["node_name"]
1242 def BuildHooksEnv(self):
1245 This doesn't run on the target node in the pre phase as a failed
1246 node would then be impossible to remove.
1250 "OP_TARGET": self.op.node_name,
1251 "NODE_NAME": self.op.node_name,
1253 all_nodes = self.cfg.GetNodeList()
1254 all_nodes.remove(self.op.node_name)
1255 return env, all_nodes, all_nodes
1257 def CheckPrereq(self):
1258 """Check prerequisites.
1261 - the node exists in the configuration
1262 - it does not have primary or secondary instances
1263 - it's not the master
1265 Any errors are signalled by raising errors.OpPrereqError.
1268 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1270 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1272 instance_list = self.cfg.GetInstanceList()
1274 masternode = self.sstore.GetMasterNode()
1275 if node.name == masternode:
1276 raise errors.OpPrereqError("Node is the master node,"
1277 " you need to failover first.")
1279 for instance_name in instance_list:
1280 instance = self.cfg.GetInstanceInfo(instance_name)
1281 if node.name == instance.primary_node:
1282 raise errors.OpPrereqError("Instance %s still running on the node,"
1283 " please remove first." % instance_name)
1284 if node.name in instance.secondary_nodes:
1285 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1286 " please remove first." % instance_name)
1287 self.op.node_name = node.name
1290 def Exec(self, feedback_fn):
1291 """Removes the node from the cluster.
1295 logger.Info("stopping the node daemon and removing configs from node %s" %
1298 rpc.call_node_leave_cluster(node.name)
1300 logger.Info("Removing node %s from config" % node.name)
1302 self.cfg.RemoveNode(node.name)
1303 # Remove the node from the Ganeti Lock Manager
1304 self.context.glm.remove(locking.LEVEL_NODE, node.name)
1306 utils.RemoveHostFromEtcHosts(node.name)
1309 class LUQueryNodes(NoHooksLU):
1310 """Logical unit for querying nodes.
1313 _OP_REQP = ["output_fields", "names"]
1315 def CheckPrereq(self):
1316 """Check prerequisites.
1318 This checks that the fields required are valid output fields.
1321 self.dynamic_fields = frozenset([
1323 "mtotal", "mnode", "mfree",
1328 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1329 "pinst_list", "sinst_list",
1330 "pip", "sip", "tags"],
1331 dynamic=self.dynamic_fields,
1332 selected=self.op.output_fields)
1334 self.wanted = _GetWantedNodes(self, self.op.names)
1336 def Exec(self, feedback_fn):
1337 """Computes the list of nodes and their attributes.
1340 nodenames = self.wanted
1341 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1343 # begin data gathering
1345 if self.dynamic_fields.intersection(self.op.output_fields):
1347 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1348 for name in nodenames:
1349 nodeinfo = node_data.get(name, None)
1352 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1353 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1354 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1355 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1356 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1357 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1358 "bootid": nodeinfo['bootid'],
1361 live_data[name] = {}
1363 live_data = dict.fromkeys(nodenames, {})
1365 node_to_primary = dict([(name, set()) for name in nodenames])
1366 node_to_secondary = dict([(name, set()) for name in nodenames])
1368 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1369 "sinst_cnt", "sinst_list"))
1370 if inst_fields & frozenset(self.op.output_fields):
1371 instancelist = self.cfg.GetInstanceList()
1373 for instance_name in instancelist:
1374 inst = self.cfg.GetInstanceInfo(instance_name)
1375 if inst.primary_node in node_to_primary:
1376 node_to_primary[inst.primary_node].add(inst.name)
1377 for secnode in inst.secondary_nodes:
1378 if secnode in node_to_secondary:
1379 node_to_secondary[secnode].add(inst.name)
1381 # end data gathering
1384 for node in nodelist:
1386 for field in self.op.output_fields:
1389 elif field == "pinst_list":
1390 val = list(node_to_primary[node.name])
1391 elif field == "sinst_list":
1392 val = list(node_to_secondary[node.name])
1393 elif field == "pinst_cnt":
1394 val = len(node_to_primary[node.name])
1395 elif field == "sinst_cnt":
1396 val = len(node_to_secondary[node.name])
1397 elif field == "pip":
1398 val = node.primary_ip
1399 elif field == "sip":
1400 val = node.secondary_ip
1401 elif field == "tags":
1402 val = list(node.GetTags())
1403 elif field in self.dynamic_fields:
1404 val = live_data[node.name].get(field, None)
1406 raise errors.ParameterError(field)
1407 node_output.append(val)
1408 output.append(node_output)
1413 class LUQueryNodeVolumes(NoHooksLU):
1414 """Logical unit for getting volumes on node(s).
1417 _OP_REQP = ["nodes", "output_fields"]
1419 def CheckPrereq(self):
1420 """Check prerequisites.
1422 This checks that the fields required are valid output fields.
1425 self.nodes = _GetWantedNodes(self, self.op.nodes)
1427 _CheckOutputFields(static=["node"],
1428 dynamic=["phys", "vg", "name", "size", "instance"],
1429 selected=self.op.output_fields)
1432 def Exec(self, feedback_fn):
1433 """Computes the list of nodes and their attributes.
1436 nodenames = self.nodes
1437 volumes = rpc.call_node_volumes(nodenames)
1439 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1440 in self.cfg.GetInstanceList()]
1442 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1445 for node in nodenames:
1446 if node not in volumes or not volumes[node]:
1449 node_vols = volumes[node][:]
1450 node_vols.sort(key=lambda vol: vol['dev'])
1452 for vol in node_vols:
1454 for field in self.op.output_fields:
1457 elif field == "phys":
1461 elif field == "name":
1463 elif field == "size":
1464 val = int(float(vol['size']))
1465 elif field == "instance":
1467 if node not in lv_by_node[inst]:
1469 if vol['name'] in lv_by_node[inst][node]:
1475 raise errors.ParameterError(field)
1476 node_output.append(str(val))
1478 output.append(node_output)
1483 class LUAddNode(LogicalUnit):
1484 """Logical unit for adding node to the cluster.
1488 HTYPE = constants.HTYPE_NODE
1489 _OP_REQP = ["node_name"]
1491 def BuildHooksEnv(self):
1494 This will run on all nodes before, and on all nodes + the new node after.
1498 "OP_TARGET": self.op.node_name,
1499 "NODE_NAME": self.op.node_name,
1500 "NODE_PIP": self.op.primary_ip,
1501 "NODE_SIP": self.op.secondary_ip,
1503 nodes_0 = self.cfg.GetNodeList()
1504 nodes_1 = nodes_0 + [self.op.node_name, ]
1505 return env, nodes_0, nodes_1
1507 def CheckPrereq(self):
1508 """Check prerequisites.
1511 - the new node is not already in the config
1513 - its parameters (single/dual homed) matches the cluster
1515 Any errors are signalled by raising errors.OpPrereqError.
1518 node_name = self.op.node_name
1521 dns_data = utils.HostInfo(node_name)
1523 node = dns_data.name
1524 primary_ip = self.op.primary_ip = dns_data.ip
1525 secondary_ip = getattr(self.op, "secondary_ip", None)
1526 if secondary_ip is None:
1527 secondary_ip = primary_ip
1528 if not utils.IsValidIP(secondary_ip):
1529 raise errors.OpPrereqError("Invalid secondary IP given")
1530 self.op.secondary_ip = secondary_ip
1532 node_list = cfg.GetNodeList()
1533 if not self.op.readd and node in node_list:
1534 raise errors.OpPrereqError("Node %s is already in the configuration" %
1536 elif self.op.readd and node not in node_list:
1537 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1539 for existing_node_name in node_list:
1540 existing_node = cfg.GetNodeInfo(existing_node_name)
1542 if self.op.readd and node == existing_node_name:
1543 if (existing_node.primary_ip != primary_ip or
1544 existing_node.secondary_ip != secondary_ip):
1545 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1546 " address configuration as before")
1549 if (existing_node.primary_ip == primary_ip or
1550 existing_node.secondary_ip == primary_ip or
1551 existing_node.primary_ip == secondary_ip or
1552 existing_node.secondary_ip == secondary_ip):
1553 raise errors.OpPrereqError("New node ip address(es) conflict with"
1554 " existing node %s" % existing_node.name)
1556 # check that the type of the node (single versus dual homed) is the
1557 # same as for the master
1558 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1559 master_singlehomed = myself.secondary_ip == myself.primary_ip
1560 newbie_singlehomed = secondary_ip == primary_ip
1561 if master_singlehomed != newbie_singlehomed:
1562 if master_singlehomed:
1563 raise errors.OpPrereqError("The master has no private ip but the"
1564 " new node has one")
1566 raise errors.OpPrereqError("The master has a private ip but the"
1567 " new node doesn't have one")
1569 # checks reachablity
1570 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1571 raise errors.OpPrereqError("Node not reachable by ping")
1573 if not newbie_singlehomed:
1574 # check reachability from my secondary ip to newbie's secondary ip
1575 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1576 source=myself.secondary_ip):
1577 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1578 " based ping to noded port")
1580 self.new_node = objects.Node(name=node,
1581 primary_ip=primary_ip,
1582 secondary_ip=secondary_ip)
1584 def Exec(self, feedback_fn):
1585 """Adds the new node to the cluster.
1588 new_node = self.new_node
1589 node = new_node.name
1591 # check connectivity
1592 result = rpc.call_version([node])[node]
1594 if constants.PROTOCOL_VERSION == result:
1595 logger.Info("communication to node %s fine, sw version %s match" %
1598 raise errors.OpExecError("Version mismatch master version %s,"
1599 " node version %s" %
1600 (constants.PROTOCOL_VERSION, result))
1602 raise errors.OpExecError("Cannot get version from the new node")
1605 logger.Info("copy ssh key to node %s" % node)
1606 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1608 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1609 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1615 keyarray.append(f.read())
1619 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1620 keyarray[3], keyarray[4], keyarray[5])
1623 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1625 # Add node to our /etc/hosts, and add key to known_hosts
1626 utils.AddHostToEtcHosts(new_node.name)
1628 if new_node.secondary_ip != new_node.primary_ip:
1629 if not rpc.call_node_tcp_ping(new_node.name,
1630 constants.LOCALHOST_IP_ADDRESS,
1631 new_node.secondary_ip,
1632 constants.DEFAULT_NODED_PORT,
1634 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1635 " you gave (%s). Please fix and re-run this"
1636 " command." % new_node.secondary_ip)
1638 node_verify_list = [self.sstore.GetMasterNode()]
1639 node_verify_param = {
1641 # TODO: do a node-net-test as well?
1644 result = rpc.call_node_verify(node_verify_list, node_verify_param)
1645 for verifier in node_verify_list:
1646 if not result[verifier]:
1647 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1648 " for remote verification" % verifier)
1649 if result[verifier]['nodelist']:
1650 for failed in result[verifier]['nodelist']:
1651 feedback_fn("ssh/hostname verification failed %s -> %s" %
1652 (verifier, result[verifier]['nodelist'][failed]))
1653 raise errors.OpExecError("ssh/hostname verification failed.")
1655 # Distribute updated /etc/hosts and known_hosts to all nodes,
1656 # including the node just added
1657 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1658 dist_nodes = self.cfg.GetNodeList()
1659 if not self.op.readd:
1660 dist_nodes.append(node)
1661 if myself.name in dist_nodes:
1662 dist_nodes.remove(myself.name)
1664 logger.Debug("Copying hosts and known_hosts to all nodes")
1665 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1666 result = rpc.call_upload_file(dist_nodes, fname)
1667 for to_node in dist_nodes:
1668 if not result[to_node]:
1669 logger.Error("copy of file %s to node %s failed" %
1672 to_copy = self.sstore.GetFileList()
1673 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1674 to_copy.append(constants.VNC_PASSWORD_FILE)
1675 for fname in to_copy:
1676 result = rpc.call_upload_file([node], fname)
1677 if not result[node]:
1678 logger.Error("could not copy file %s to node %s" % (fname, node))
1680 if not self.op.readd:
1681 logger.Info("adding node %s to cluster.conf" % node)
1682 self.cfg.AddNode(new_node)
1683 # Add the new node to the Ganeti Lock Manager
1684 self.context.glm.add(locking.LEVEL_NODE, node)
1687 class LUMasterFailover(LogicalUnit):
1688 """Failover the master node to the current node.
1690 This is a special LU in that it must run on a non-master node.
1693 HPATH = "master-failover"
1694 HTYPE = constants.HTYPE_CLUSTER
1699 def BuildHooksEnv(self):
1702 This will run on the new master only in the pre phase, and on all
1703 the nodes in the post phase.
1707 "OP_TARGET": self.new_master,
1708 "NEW_MASTER": self.new_master,
1709 "OLD_MASTER": self.old_master,
1711 return env, [self.new_master], self.cfg.GetNodeList()
1713 def CheckPrereq(self):
1714 """Check prerequisites.
1716 This checks that we are not already the master.
1719 self.new_master = utils.HostInfo().name
1720 self.old_master = self.sstore.GetMasterNode()
1722 if self.old_master == self.new_master:
1723 raise errors.OpPrereqError("This commands must be run on the node"
1724 " where you want the new master to be."
1725 " %s is already the master" %
1728 def Exec(self, feedback_fn):
1729 """Failover the master node.
1731 This command, when run on a non-master node, will cause the current
1732 master to cease being master, and the non-master to become new
1736 #TODO: do not rely on gethostname returning the FQDN
1737 logger.Info("setting master to %s, old master: %s" %
1738 (self.new_master, self.old_master))
1740 if not rpc.call_node_stop_master(self.old_master):
1741 logger.Error("could disable the master role on the old master"
1742 " %s, please disable manually" % self.old_master)
1745 ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
1746 if not rpc.call_upload_file(self.cfg.GetNodeList(),
1747 ss.KeyToFilename(ss.SS_MASTER_NODE)):
1748 logger.Error("could not distribute the new simple store master file"
1749 " to the other nodes, please check.")
1751 if not rpc.call_node_start_master(self.new_master):
1752 logger.Error("could not start the master role on the new master"
1753 " %s, please check" % self.new_master)
1754 feedback_fn("Error in activating the master IP on the new master,"
1755 " please fix manually.")
1759 class LUQueryClusterInfo(NoHooksLU):
1760 """Query cluster configuration.
1767 def ExpandNames(self):
1768 self.needed_locks = {}
1770 def CheckPrereq(self):
1771 """No prerequsites needed for this LU.
1776 def Exec(self, feedback_fn):
1777 """Return cluster config.
1781 "name": self.sstore.GetClusterName(),
1782 "software_version": constants.RELEASE_VERSION,
1783 "protocol_version": constants.PROTOCOL_VERSION,
1784 "config_version": constants.CONFIG_VERSION,
1785 "os_api_version": constants.OS_API_VERSION,
1786 "export_version": constants.EXPORT_VERSION,
1787 "master": self.sstore.GetMasterNode(),
1788 "architecture": (platform.architecture()[0], platform.machine()),
1789 "hypervisor_type": self.sstore.GetHypervisorType(),
1795 class LUDumpClusterConfig(NoHooksLU):
1796 """Return a text-representation of the cluster-config.
1802 def ExpandNames(self):
1803 self.needed_locks = {}
1805 def CheckPrereq(self):
1806 """No prerequisites.
1811 def Exec(self, feedback_fn):
1812 """Dump a representation of the cluster config to the standard output.
1815 return self.cfg.DumpConfig()
1818 class LUActivateInstanceDisks(NoHooksLU):
1819 """Bring up an instance's disks.
1822 _OP_REQP = ["instance_name"]
1824 def CheckPrereq(self):
1825 """Check prerequisites.
1827 This checks that the instance is in the cluster.
1830 instance = self.cfg.GetInstanceInfo(
1831 self.cfg.ExpandInstanceName(self.op.instance_name))
1832 if instance is None:
1833 raise errors.OpPrereqError("Instance '%s' not known" %
1834 self.op.instance_name)
1835 self.instance = instance
1838 def Exec(self, feedback_fn):
1839 """Activate the disks.
1842 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1844 raise errors.OpExecError("Cannot activate block devices")
1849 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1850 """Prepare the block devices for an instance.
1852 This sets up the block devices on all nodes.
1855 instance: a ganeti.objects.Instance object
1856 ignore_secondaries: if true, errors on secondary nodes won't result
1857 in an error return from the function
1860 false if the operation failed
1861 list of (host, instance_visible_name, node_visible_name) if the operation
1862 suceeded with the mapping from node devices to instance devices
1866 iname = instance.name
1867 # With the two passes mechanism we try to reduce the window of
1868 # opportunity for the race condition of switching DRBD to primary
1869 # before handshaking occured, but we do not eliminate it
1871 # The proper fix would be to wait (with some limits) until the
1872 # connection has been made and drbd transitions from WFConnection
1873 # into any other network-connected state (Connected, SyncTarget,
1876 # 1st pass, assemble on all nodes in secondary mode
1877 for inst_disk in instance.disks:
1878 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1879 cfg.SetDiskID(node_disk, node)
1880 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1882 logger.Error("could not prepare block device %s on node %s"
1883 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1884 if not ignore_secondaries:
1887 # FIXME: race condition on drbd migration to primary
1889 # 2nd pass, do only the primary node
1890 for inst_disk in instance.disks:
1891 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1892 if node != instance.primary_node:
1894 cfg.SetDiskID(node_disk, node)
1895 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1897 logger.Error("could not prepare block device %s on node %s"
1898 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1900 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1902 # leave the disks configured for the primary node
1903 # this is a workaround that would be fixed better by
1904 # improving the logical/physical id handling
1905 for disk in instance.disks:
1906 cfg.SetDiskID(disk, instance.primary_node)
1908 return disks_ok, device_info
1911 def _StartInstanceDisks(cfg, instance, force):
1912 """Start the disks of an instance.
1915 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1916 ignore_secondaries=force)
1918 _ShutdownInstanceDisks(instance, cfg)
1919 if force is not None and not force:
1920 logger.Error("If the message above refers to a secondary node,"
1921 " you can retry the operation using '--force'.")
1922 raise errors.OpExecError("Disk consistency error")
1925 class LUDeactivateInstanceDisks(NoHooksLU):
1926 """Shutdown an instance's disks.
1929 _OP_REQP = ["instance_name"]
1931 def CheckPrereq(self):
1932 """Check prerequisites.
1934 This checks that the instance is in the cluster.
1937 instance = self.cfg.GetInstanceInfo(
1938 self.cfg.ExpandInstanceName(self.op.instance_name))
1939 if instance is None:
1940 raise errors.OpPrereqError("Instance '%s' not known" %
1941 self.op.instance_name)
1942 self.instance = instance
1944 def Exec(self, feedback_fn):
1945 """Deactivate the disks
1948 instance = self.instance
1949 ins_l = rpc.call_instance_list([instance.primary_node])
1950 ins_l = ins_l[instance.primary_node]
1951 if not type(ins_l) is list:
1952 raise errors.OpExecError("Can't contact node '%s'" %
1953 instance.primary_node)
1955 if self.instance.name in ins_l:
1956 raise errors.OpExecError("Instance is running, can't shutdown"
1959 _ShutdownInstanceDisks(instance, self.cfg)
1962 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1963 """Shutdown block devices of an instance.
1965 This does the shutdown on all nodes of the instance.
1967 If the ignore_primary is false, errors on the primary node are
1972 for disk in instance.disks:
1973 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1974 cfg.SetDiskID(top_disk, node)
1975 if not rpc.call_blockdev_shutdown(node, top_disk):
1976 logger.Error("could not shutdown block device %s on node %s" %
1977 (disk.iv_name, node))
1978 if not ignore_primary or node != instance.primary_node:
1983 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1984 """Checks if a node has enough free memory.
1986 This function check if a given node has the needed amount of free
1987 memory. In case the node has less memory or we cannot get the
1988 information from the node, this function raise an OpPrereqError
1992 - cfg: a ConfigWriter instance
1993 - node: the node name
1994 - reason: string to use in the error message
1995 - requested: the amount of memory in MiB
1998 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1999 if not nodeinfo or not isinstance(nodeinfo, dict):
2000 raise errors.OpPrereqError("Could not contact node %s for resource"
2001 " information" % (node,))
2003 free_mem = nodeinfo[node].get('memory_free')
2004 if not isinstance(free_mem, int):
2005 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2006 " was '%s'" % (node, free_mem))
2007 if requested > free_mem:
2008 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2009 " needed %s MiB, available %s MiB" %
2010 (node, reason, requested, free_mem))
2013 class LUStartupInstance(LogicalUnit):
2014 """Starts an instance.
2017 HPATH = "instance-start"
2018 HTYPE = constants.HTYPE_INSTANCE
2019 _OP_REQP = ["instance_name", "force"]
2021 def BuildHooksEnv(self):
2024 This runs on master, primary and secondary nodes of the instance.
2028 "FORCE": self.op.force,
2030 env.update(_BuildInstanceHookEnvByObject(self.instance))
2031 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2032 list(self.instance.secondary_nodes))
2035 def CheckPrereq(self):
2036 """Check prerequisites.
2038 This checks that the instance is in the cluster.
2041 instance = self.cfg.GetInstanceInfo(
2042 self.cfg.ExpandInstanceName(self.op.instance_name))
2043 if instance is None:
2044 raise errors.OpPrereqError("Instance '%s' not known" %
2045 self.op.instance_name)
2047 # check bridges existance
2048 _CheckInstanceBridgesExist(instance)
2050 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2051 "starting instance %s" % instance.name,
2054 self.instance = instance
2055 self.op.instance_name = instance.name
2057 def Exec(self, feedback_fn):
2058 """Start the instance.
2061 instance = self.instance
2062 force = self.op.force
2063 extra_args = getattr(self.op, "extra_args", "")
2065 self.cfg.MarkInstanceUp(instance.name)
2067 node_current = instance.primary_node
2069 _StartInstanceDisks(self.cfg, instance, force)
2071 if not rpc.call_instance_start(node_current, instance, extra_args):
2072 _ShutdownInstanceDisks(instance, self.cfg)
2073 raise errors.OpExecError("Could not start instance")
2076 class LURebootInstance(LogicalUnit):
2077 """Reboot an instance.
2080 HPATH = "instance-reboot"
2081 HTYPE = constants.HTYPE_INSTANCE
2082 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2084 def BuildHooksEnv(self):
2087 This runs on master, primary and secondary nodes of the instance.
2091 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2093 env.update(_BuildInstanceHookEnvByObject(self.instance))
2094 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2095 list(self.instance.secondary_nodes))
2098 def CheckPrereq(self):
2099 """Check prerequisites.
2101 This checks that the instance is in the cluster.
2104 instance = self.cfg.GetInstanceInfo(
2105 self.cfg.ExpandInstanceName(self.op.instance_name))
2106 if instance is None:
2107 raise errors.OpPrereqError("Instance '%s' not known" %
2108 self.op.instance_name)
2110 # check bridges existance
2111 _CheckInstanceBridgesExist(instance)
2113 self.instance = instance
2114 self.op.instance_name = instance.name
2116 def Exec(self, feedback_fn):
2117 """Reboot the instance.
2120 instance = self.instance
2121 ignore_secondaries = self.op.ignore_secondaries
2122 reboot_type = self.op.reboot_type
2123 extra_args = getattr(self.op, "extra_args", "")
2125 node_current = instance.primary_node
2127 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2128 constants.INSTANCE_REBOOT_HARD,
2129 constants.INSTANCE_REBOOT_FULL]:
2130 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2131 (constants.INSTANCE_REBOOT_SOFT,
2132 constants.INSTANCE_REBOOT_HARD,
2133 constants.INSTANCE_REBOOT_FULL))
2135 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2136 constants.INSTANCE_REBOOT_HARD]:
2137 if not rpc.call_instance_reboot(node_current, instance,
2138 reboot_type, extra_args):
2139 raise errors.OpExecError("Could not reboot instance")
2141 if not rpc.call_instance_shutdown(node_current, instance):
2142 raise errors.OpExecError("could not shutdown instance for full reboot")
2143 _ShutdownInstanceDisks(instance, self.cfg)
2144 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2145 if not rpc.call_instance_start(node_current, instance, extra_args):
2146 _ShutdownInstanceDisks(instance, self.cfg)
2147 raise errors.OpExecError("Could not start instance for full reboot")
2149 self.cfg.MarkInstanceUp(instance.name)
2152 class LUShutdownInstance(LogicalUnit):
2153 """Shutdown an instance.
2156 HPATH = "instance-stop"
2157 HTYPE = constants.HTYPE_INSTANCE
2158 _OP_REQP = ["instance_name"]
2160 def BuildHooksEnv(self):
2163 This runs on master, primary and secondary nodes of the instance.
2166 env = _BuildInstanceHookEnvByObject(self.instance)
2167 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2168 list(self.instance.secondary_nodes))
2171 def CheckPrereq(self):
2172 """Check prerequisites.
2174 This checks that the instance is in the cluster.
2177 instance = self.cfg.GetInstanceInfo(
2178 self.cfg.ExpandInstanceName(self.op.instance_name))
2179 if instance is None:
2180 raise errors.OpPrereqError("Instance '%s' not known" %
2181 self.op.instance_name)
2182 self.instance = instance
2184 def Exec(self, feedback_fn):
2185 """Shutdown the instance.
2188 instance = self.instance
2189 node_current = instance.primary_node
2190 self.cfg.MarkInstanceDown(instance.name)
2191 if not rpc.call_instance_shutdown(node_current, instance):
2192 logger.Error("could not shutdown instance")
2194 _ShutdownInstanceDisks(instance, self.cfg)
2197 class LUReinstallInstance(LogicalUnit):
2198 """Reinstall an instance.
2201 HPATH = "instance-reinstall"
2202 HTYPE = constants.HTYPE_INSTANCE
2203 _OP_REQP = ["instance_name"]
2205 def BuildHooksEnv(self):
2208 This runs on master, primary and secondary nodes of the instance.
2211 env = _BuildInstanceHookEnvByObject(self.instance)
2212 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2213 list(self.instance.secondary_nodes))
2216 def CheckPrereq(self):
2217 """Check prerequisites.
2219 This checks that the instance is in the cluster and is not running.
2222 instance = self.cfg.GetInstanceInfo(
2223 self.cfg.ExpandInstanceName(self.op.instance_name))
2224 if instance is None:
2225 raise errors.OpPrereqError("Instance '%s' not known" %
2226 self.op.instance_name)
2227 if instance.disk_template == constants.DT_DISKLESS:
2228 raise errors.OpPrereqError("Instance '%s' has no disks" %
2229 self.op.instance_name)
2230 if instance.status != "down":
2231 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2232 self.op.instance_name)
2233 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2235 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2236 (self.op.instance_name,
2237 instance.primary_node))
2239 self.op.os_type = getattr(self.op, "os_type", None)
2240 if self.op.os_type is not None:
2242 pnode = self.cfg.GetNodeInfo(
2243 self.cfg.ExpandNodeName(instance.primary_node))
2245 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2247 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2249 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2250 " primary node" % self.op.os_type)
2252 self.instance = instance
2254 def Exec(self, feedback_fn):
2255 """Reinstall the instance.
2258 inst = self.instance
2260 if self.op.os_type is not None:
2261 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2262 inst.os = self.op.os_type
2263 self.cfg.AddInstance(inst)
2265 _StartInstanceDisks(self.cfg, inst, None)
2267 feedback_fn("Running the instance OS create scripts...")
2268 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2269 raise errors.OpExecError("Could not install OS for instance %s"
2271 (inst.name, inst.primary_node))
2273 _ShutdownInstanceDisks(inst, self.cfg)
2276 class LURenameInstance(LogicalUnit):
2277 """Rename an instance.
2280 HPATH = "instance-rename"
2281 HTYPE = constants.HTYPE_INSTANCE
2282 _OP_REQP = ["instance_name", "new_name"]
2284 def BuildHooksEnv(self):
2287 This runs on master, primary and secondary nodes of the instance.
2290 env = _BuildInstanceHookEnvByObject(self.instance)
2291 env["INSTANCE_NEW_NAME"] = self.op.new_name
2292 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2293 list(self.instance.secondary_nodes))
2296 def CheckPrereq(self):
2297 """Check prerequisites.
2299 This checks that the instance is in the cluster and is not running.
2302 instance = self.cfg.GetInstanceInfo(
2303 self.cfg.ExpandInstanceName(self.op.instance_name))
2304 if instance is None:
2305 raise errors.OpPrereqError("Instance '%s' not known" %
2306 self.op.instance_name)
2307 if instance.status != "down":
2308 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2309 self.op.instance_name)
2310 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2312 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2313 (self.op.instance_name,
2314 instance.primary_node))
2315 self.instance = instance
2317 # new name verification
2318 name_info = utils.HostInfo(self.op.new_name)
2320 self.op.new_name = new_name = name_info.name
2321 instance_list = self.cfg.GetInstanceList()
2322 if new_name in instance_list:
2323 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2326 if not getattr(self.op, "ignore_ip", False):
2327 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2328 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2329 (name_info.ip, new_name))
2332 def Exec(self, feedback_fn):
2333 """Reinstall the instance.
2336 inst = self.instance
2337 old_name = inst.name
2339 if inst.disk_template == constants.DT_FILE:
2340 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2342 self.cfg.RenameInstance(inst.name, self.op.new_name)
2344 # re-read the instance from the configuration after rename
2345 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2347 if inst.disk_template == constants.DT_FILE:
2348 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2349 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2350 old_file_storage_dir,
2351 new_file_storage_dir)
2354 raise errors.OpExecError("Could not connect to node '%s' to rename"
2355 " directory '%s' to '%s' (but the instance"
2356 " has been renamed in Ganeti)" % (
2357 inst.primary_node, old_file_storage_dir,
2358 new_file_storage_dir))
2361 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2362 " (but the instance has been renamed in"
2363 " Ganeti)" % (old_file_storage_dir,
2364 new_file_storage_dir))
2366 _StartInstanceDisks(self.cfg, inst, None)
2368 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2370 msg = ("Could run OS rename script for instance %s on node %s (but the"
2371 " instance has been renamed in Ganeti)" %
2372 (inst.name, inst.primary_node))
2375 _ShutdownInstanceDisks(inst, self.cfg)
2378 class LURemoveInstance(LogicalUnit):
2379 """Remove an instance.
2382 HPATH = "instance-remove"
2383 HTYPE = constants.HTYPE_INSTANCE
2384 _OP_REQP = ["instance_name", "ignore_failures"]
2386 def BuildHooksEnv(self):
2389 This runs on master, primary and secondary nodes of the instance.
2392 env = _BuildInstanceHookEnvByObject(self.instance)
2393 nl = [self.sstore.GetMasterNode()]
2396 def CheckPrereq(self):
2397 """Check prerequisites.
2399 This checks that the instance is in the cluster.
2402 instance = self.cfg.GetInstanceInfo(
2403 self.cfg.ExpandInstanceName(self.op.instance_name))
2404 if instance is None:
2405 raise errors.OpPrereqError("Instance '%s' not known" %
2406 self.op.instance_name)
2407 self.instance = instance
2409 def Exec(self, feedback_fn):
2410 """Remove the instance.
2413 instance = self.instance
2414 logger.Info("shutting down instance %s on node %s" %
2415 (instance.name, instance.primary_node))
2417 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2418 if self.op.ignore_failures:
2419 feedback_fn("Warning: can't shutdown instance")
2421 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2422 (instance.name, instance.primary_node))
2424 logger.Info("removing block devices for instance %s" % instance.name)
2426 if not _RemoveDisks(instance, self.cfg):
2427 if self.op.ignore_failures:
2428 feedback_fn("Warning: can't remove instance's disks")
2430 raise errors.OpExecError("Can't remove instance's disks")
2432 logger.Info("removing instance %s out of cluster config" % instance.name)
2434 self.cfg.RemoveInstance(instance.name)
2435 # Remove the new instance from the Ganeti Lock Manager
2436 self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2439 class LUQueryInstances(NoHooksLU):
2440 """Logical unit for querying instances.
2443 _OP_REQP = ["output_fields", "names"]
2445 def CheckPrereq(self):
2446 """Check prerequisites.
2448 This checks that the fields required are valid output fields.
2451 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2452 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2453 "admin_state", "admin_ram",
2454 "disk_template", "ip", "mac", "bridge",
2455 "sda_size", "sdb_size", "vcpus", "tags"],
2456 dynamic=self.dynamic_fields,
2457 selected=self.op.output_fields)
2459 self.wanted = _GetWantedInstances(self, self.op.names)
2461 def Exec(self, feedback_fn):
2462 """Computes the list of nodes and their attributes.
2465 instance_names = self.wanted
2466 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2469 # begin data gathering
2471 nodes = frozenset([inst.primary_node for inst in instance_list])
2474 if self.dynamic_fields.intersection(self.op.output_fields):
2476 node_data = rpc.call_all_instances_info(nodes)
2478 result = node_data[name]
2480 live_data.update(result)
2481 elif result == False:
2482 bad_nodes.append(name)
2483 # else no instance is alive
2485 live_data = dict([(name, {}) for name in instance_names])
2487 # end data gathering
2490 for instance in instance_list:
2492 for field in self.op.output_fields:
2497 elif field == "pnode":
2498 val = instance.primary_node
2499 elif field == "snodes":
2500 val = list(instance.secondary_nodes)
2501 elif field == "admin_state":
2502 val = (instance.status != "down")
2503 elif field == "oper_state":
2504 if instance.primary_node in bad_nodes:
2507 val = bool(live_data.get(instance.name))
2508 elif field == "status":
2509 if instance.primary_node in bad_nodes:
2510 val = "ERROR_nodedown"
2512 running = bool(live_data.get(instance.name))
2514 if instance.status != "down":
2519 if instance.status != "down":
2523 elif field == "admin_ram":
2524 val = instance.memory
2525 elif field == "oper_ram":
2526 if instance.primary_node in bad_nodes:
2528 elif instance.name in live_data:
2529 val = live_data[instance.name].get("memory", "?")
2532 elif field == "disk_template":
2533 val = instance.disk_template
2535 val = instance.nics[0].ip
2536 elif field == "bridge":
2537 val = instance.nics[0].bridge
2538 elif field == "mac":
2539 val = instance.nics[0].mac
2540 elif field == "sda_size" or field == "sdb_size":
2541 disk = instance.FindDisk(field[:3])
2546 elif field == "vcpus":
2547 val = instance.vcpus
2548 elif field == "tags":
2549 val = list(instance.GetTags())
2551 raise errors.ParameterError(field)
2558 class LUFailoverInstance(LogicalUnit):
2559 """Failover an instance.
2562 HPATH = "instance-failover"
2563 HTYPE = constants.HTYPE_INSTANCE
2564 _OP_REQP = ["instance_name", "ignore_consistency"]
2566 def BuildHooksEnv(self):
2569 This runs on master, primary and secondary nodes of the instance.
2573 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2575 env.update(_BuildInstanceHookEnvByObject(self.instance))
2576 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2579 def CheckPrereq(self):
2580 """Check prerequisites.
2582 This checks that the instance is in the cluster.
2585 instance = self.cfg.GetInstanceInfo(
2586 self.cfg.ExpandInstanceName(self.op.instance_name))
2587 if instance is None:
2588 raise errors.OpPrereqError("Instance '%s' not known" %
2589 self.op.instance_name)
2591 if instance.disk_template not in constants.DTS_NET_MIRROR:
2592 raise errors.OpPrereqError("Instance's disk layout is not"
2593 " network mirrored, cannot failover.")
2595 secondary_nodes = instance.secondary_nodes
2596 if not secondary_nodes:
2597 raise errors.ProgrammerError("no secondary node but using "
2598 "a mirrored disk template")
2600 target_node = secondary_nodes[0]
2601 # check memory requirements on the secondary node
2602 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2603 instance.name, instance.memory)
2605 # check bridge existance
2606 brlist = [nic.bridge for nic in instance.nics]
2607 if not rpc.call_bridges_exist(target_node, brlist):
2608 raise errors.OpPrereqError("One or more target bridges %s does not"
2609 " exist on destination node '%s'" %
2610 (brlist, target_node))
2612 self.instance = instance
2614 def Exec(self, feedback_fn):
2615 """Failover an instance.
2617 The failover is done by shutting it down on its present node and
2618 starting it on the secondary.
2621 instance = self.instance
2623 source_node = instance.primary_node
2624 target_node = instance.secondary_nodes[0]
2626 feedback_fn("* checking disk consistency between source and target")
2627 for dev in instance.disks:
2628 # for drbd, these are drbd over lvm
2629 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2630 if instance.status == "up" and not self.op.ignore_consistency:
2631 raise errors.OpExecError("Disk %s is degraded on target node,"
2632 " aborting failover." % dev.iv_name)
2634 feedback_fn("* shutting down instance on source node")
2635 logger.Info("Shutting down instance %s on node %s" %
2636 (instance.name, source_node))
2638 if not rpc.call_instance_shutdown(source_node, instance):
2639 if self.op.ignore_consistency:
2640 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2641 " anyway. Please make sure node %s is down" %
2642 (instance.name, source_node, source_node))
2644 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2645 (instance.name, source_node))
2647 feedback_fn("* deactivating the instance's disks on source node")
2648 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2649 raise errors.OpExecError("Can't shut down the instance's disks.")
2651 instance.primary_node = target_node
2652 # distribute new instance config to the other nodes
2653 self.cfg.Update(instance)
2655 # Only start the instance if it's marked as up
2656 if instance.status == "up":
2657 feedback_fn("* activating the instance's disks on target node")
2658 logger.Info("Starting instance %s on node %s" %
2659 (instance.name, target_node))
2661 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2662 ignore_secondaries=True)
2664 _ShutdownInstanceDisks(instance, self.cfg)
2665 raise errors.OpExecError("Can't activate the instance's disks")
2667 feedback_fn("* starting the instance on the target node")
2668 if not rpc.call_instance_start(target_node, instance, None):
2669 _ShutdownInstanceDisks(instance, self.cfg)
2670 raise errors.OpExecError("Could not start instance %s on node %s." %
2671 (instance.name, target_node))
2674 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2675 """Create a tree of block devices on the primary node.
2677 This always creates all devices.
2681 for child in device.children:
2682 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2685 cfg.SetDiskID(device, node)
2686 new_id = rpc.call_blockdev_create(node, device, device.size,
2687 instance.name, True, info)
2690 if device.physical_id is None:
2691 device.physical_id = new_id
2695 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2696 """Create a tree of block devices on a secondary node.
2698 If this device type has to be created on secondaries, create it and
2701 If not, just recurse to children keeping the same 'force' value.
2704 if device.CreateOnSecondary():
2707 for child in device.children:
2708 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2709 child, force, info):
2714 cfg.SetDiskID(device, node)
2715 new_id = rpc.call_blockdev_create(node, device, device.size,
2716 instance.name, False, info)
2719 if device.physical_id is None:
2720 device.physical_id = new_id
2724 def _GenerateUniqueNames(cfg, exts):
2725 """Generate a suitable LV name.
2727 This will generate a logical volume name for the given instance.
2732 new_id = cfg.GenerateUniqueID()
2733 results.append("%s%s" % (new_id, val))
2737 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2738 """Generate a drbd8 device complete with its children.
2741 port = cfg.AllocatePort()
2742 vgname = cfg.GetVGName()
2743 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2744 logical_id=(vgname, names[0]))
2745 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2746 logical_id=(vgname, names[1]))
2747 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2748 logical_id = (primary, secondary, port),
2749 children = [dev_data, dev_meta],
2754 def _GenerateDiskTemplate(cfg, template_name,
2755 instance_name, primary_node,
2756 secondary_nodes, disk_sz, swap_sz,
2757 file_storage_dir, file_driver):
2758 """Generate the entire disk layout for a given template type.
2761 #TODO: compute space requirements
2763 vgname = cfg.GetVGName()
2764 if template_name == constants.DT_DISKLESS:
2766 elif template_name == constants.DT_PLAIN:
2767 if len(secondary_nodes) != 0:
2768 raise errors.ProgrammerError("Wrong template configuration")
2770 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2771 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2772 logical_id=(vgname, names[0]),
2774 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2775 logical_id=(vgname, names[1]),
2777 disks = [sda_dev, sdb_dev]
2778 elif template_name == constants.DT_DRBD8:
2779 if len(secondary_nodes) != 1:
2780 raise errors.ProgrammerError("Wrong template configuration")
2781 remote_node = secondary_nodes[0]
2782 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2783 ".sdb_data", ".sdb_meta"])
2784 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2785 disk_sz, names[0:2], "sda")
2786 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2787 swap_sz, names[2:4], "sdb")
2788 disks = [drbd_sda_dev, drbd_sdb_dev]
2789 elif template_name == constants.DT_FILE:
2790 if len(secondary_nodes) != 0:
2791 raise errors.ProgrammerError("Wrong template configuration")
2793 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2794 iv_name="sda", logical_id=(file_driver,
2795 "%s/sda" % file_storage_dir))
2796 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2797 iv_name="sdb", logical_id=(file_driver,
2798 "%s/sdb" % file_storage_dir))
2799 disks = [file_sda_dev, file_sdb_dev]
2801 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2805 def _GetInstanceInfoText(instance):
2806 """Compute that text that should be added to the disk's metadata.
2809 return "originstname+%s" % instance.name
2812 def _CreateDisks(cfg, instance):
2813 """Create all disks for an instance.
2815 This abstracts away some work from AddInstance.
2818 instance: the instance object
2821 True or False showing the success of the creation process
2824 info = _GetInstanceInfoText(instance)
2826 if instance.disk_template == constants.DT_FILE:
2827 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2828 result = rpc.call_file_storage_dir_create(instance.primary_node,
2832 logger.Error("Could not connect to node '%s'" % instance.primary_node)
2836 logger.Error("failed to create directory '%s'" % file_storage_dir)
2839 for device in instance.disks:
2840 logger.Info("creating volume %s for instance %s" %
2841 (device.iv_name, instance.name))
2843 for secondary_node in instance.secondary_nodes:
2844 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2845 device, False, info):
2846 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2847 (device.iv_name, device, secondary_node))
2850 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2851 instance, device, info):
2852 logger.Error("failed to create volume %s on primary!" %
2859 def _RemoveDisks(instance, cfg):
2860 """Remove all disks for an instance.
2862 This abstracts away some work from `AddInstance()` and
2863 `RemoveInstance()`. Note that in case some of the devices couldn't
2864 be removed, the removal will continue with the other ones (compare
2865 with `_CreateDisks()`).
2868 instance: the instance object
2871 True or False showing the success of the removal proces
2874 logger.Info("removing block devices for instance %s" % instance.name)
2877 for device in instance.disks:
2878 for node, disk in device.ComputeNodeTree(instance.primary_node):
2879 cfg.SetDiskID(disk, node)
2880 if not rpc.call_blockdev_remove(node, disk):
2881 logger.Error("could not remove block device %s on node %s,"
2882 " continuing anyway" %
2883 (device.iv_name, node))
2886 if instance.disk_template == constants.DT_FILE:
2887 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2888 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2890 logger.Error("could not remove directory '%s'" % file_storage_dir)
2896 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2897 """Compute disk size requirements in the volume group
2899 This is currently hard-coded for the two-drive layout.
2902 # Required free disk space as a function of disk and swap space
2904 constants.DT_DISKLESS: None,
2905 constants.DT_PLAIN: disk_size + swap_size,
2906 # 256 MB are added for drbd metadata, 128MB for each drbd device
2907 constants.DT_DRBD8: disk_size + swap_size + 256,
2908 constants.DT_FILE: None,
2911 if disk_template not in req_size_dict:
2912 raise errors.ProgrammerError("Disk template '%s' size requirement"
2913 " is unknown" % disk_template)
2915 return req_size_dict[disk_template]
2918 class LUCreateInstance(LogicalUnit):
2919 """Create an instance.
2922 HPATH = "instance-add"
2923 HTYPE = constants.HTYPE_INSTANCE
2924 _OP_REQP = ["instance_name", "mem_size", "disk_size",
2925 "disk_template", "swap_size", "mode", "start", "vcpus",
2926 "wait_for_sync", "ip_check", "mac"]
2928 def _RunAllocator(self):
2929 """Run the allocator based on input opcode.
2932 disks = [{"size": self.op.disk_size, "mode": "w"},
2933 {"size": self.op.swap_size, "mode": "w"}]
2934 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2935 "bridge": self.op.bridge}]
2936 ial = IAllocator(self.cfg, self.sstore,
2937 mode=constants.IALLOCATOR_MODE_ALLOC,
2938 name=self.op.instance_name,
2939 disk_template=self.op.disk_template,
2942 vcpus=self.op.vcpus,
2943 mem_size=self.op.mem_size,
2948 ial.Run(self.op.iallocator)
2951 raise errors.OpPrereqError("Can't compute nodes using"
2952 " iallocator '%s': %s" % (self.op.iallocator,
2954 if len(ial.nodes) != ial.required_nodes:
2955 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2956 " of nodes (%s), required %s" %
2957 (len(ial.nodes), ial.required_nodes))
2958 self.op.pnode = ial.nodes[0]
2959 logger.ToStdout("Selected nodes for the instance: %s" %
2960 (", ".join(ial.nodes),))
2961 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2962 (self.op.instance_name, self.op.iallocator, ial.nodes))
2963 if ial.required_nodes == 2:
2964 self.op.snode = ial.nodes[1]
2966 def BuildHooksEnv(self):
2969 This runs on master, primary and secondary nodes of the instance.
2973 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2974 "INSTANCE_DISK_SIZE": self.op.disk_size,
2975 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2976 "INSTANCE_ADD_MODE": self.op.mode,
2978 if self.op.mode == constants.INSTANCE_IMPORT:
2979 env["INSTANCE_SRC_NODE"] = self.op.src_node
2980 env["INSTANCE_SRC_PATH"] = self.op.src_path
2981 env["INSTANCE_SRC_IMAGE"] = self.src_image
2983 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
2984 primary_node=self.op.pnode,
2985 secondary_nodes=self.secondaries,
2986 status=self.instance_status,
2987 os_type=self.op.os_type,
2988 memory=self.op.mem_size,
2989 vcpus=self.op.vcpus,
2990 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
2993 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
2998 def CheckPrereq(self):
2999 """Check prerequisites.
3002 # set optional parameters to none if they don't exist
3003 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3004 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3005 "vnc_bind_address"]:
3006 if not hasattr(self.op, attr):
3007 setattr(self.op, attr, None)
3009 if self.op.mode not in (constants.INSTANCE_CREATE,
3010 constants.INSTANCE_IMPORT):
3011 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3014 if (not self.cfg.GetVGName() and
3015 self.op.disk_template not in constants.DTS_NOT_LVM):
3016 raise errors.OpPrereqError("Cluster does not support lvm-based"
3019 if self.op.mode == constants.INSTANCE_IMPORT:
3020 src_node = getattr(self.op, "src_node", None)
3021 src_path = getattr(self.op, "src_path", None)
3022 if src_node is None or src_path is None:
3023 raise errors.OpPrereqError("Importing an instance requires source"
3024 " node and path options")
3025 src_node_full = self.cfg.ExpandNodeName(src_node)
3026 if src_node_full is None:
3027 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3028 self.op.src_node = src_node = src_node_full
3030 if not os.path.isabs(src_path):
3031 raise errors.OpPrereqError("The source path must be absolute")
3033 export_info = rpc.call_export_info(src_node, src_path)
3036 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3038 if not export_info.has_section(constants.INISECT_EXP):
3039 raise errors.ProgrammerError("Corrupted export config")
3041 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3042 if (int(ei_version) != constants.EXPORT_VERSION):
3043 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3044 (ei_version, constants.EXPORT_VERSION))
3046 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3047 raise errors.OpPrereqError("Can't import instance with more than"
3050 # FIXME: are the old os-es, disk sizes, etc. useful?
3051 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3052 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3054 self.src_image = diskimage
3055 else: # INSTANCE_CREATE
3056 if getattr(self.op, "os_type", None) is None:
3057 raise errors.OpPrereqError("No guest OS specified")
3059 #### instance parameters check
3061 # disk template and mirror node verification
3062 if self.op.disk_template not in constants.DISK_TEMPLATES:
3063 raise errors.OpPrereqError("Invalid disk template name")
3065 # instance name verification
3066 hostname1 = utils.HostInfo(self.op.instance_name)
3068 self.op.instance_name = instance_name = hostname1.name
3069 instance_list = self.cfg.GetInstanceList()
3070 if instance_name in instance_list:
3071 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3074 # ip validity checks
3075 ip = getattr(self.op, "ip", None)
3076 if ip is None or ip.lower() == "none":
3078 elif ip.lower() == "auto":
3079 inst_ip = hostname1.ip
3081 if not utils.IsValidIP(ip):
3082 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3083 " like a valid IP" % ip)
3085 self.inst_ip = self.op.ip = inst_ip
3087 if self.op.start and not self.op.ip_check:
3088 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3089 " adding an instance in start mode")
3091 if self.op.ip_check:
3092 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3093 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3094 (hostname1.ip, instance_name))
3096 # MAC address verification
3097 if self.op.mac != "auto":
3098 if not utils.IsValidMac(self.op.mac.lower()):
3099 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3102 # bridge verification
3103 bridge = getattr(self.op, "bridge", None)
3105 self.op.bridge = self.cfg.GetDefBridge()
3107 self.op.bridge = bridge
3109 # boot order verification
3110 if self.op.hvm_boot_order is not None:
3111 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3112 raise errors.OpPrereqError("invalid boot order specified,"
3113 " must be one or more of [acdn]")
3114 # file storage checks
3115 if (self.op.file_driver and
3116 not self.op.file_driver in constants.FILE_DRIVER):
3117 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3118 self.op.file_driver)
3120 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3121 raise errors.OpPrereqError("File storage directory not a relative"
3125 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3126 raise errors.OpPrereqError("One and only one of iallocator and primary"
3127 " node must be given")
3129 if self.op.iallocator is not None:
3130 self._RunAllocator()
3132 #### node related checks
3134 # check primary node
3135 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3137 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3139 self.op.pnode = pnode.name
3141 self.secondaries = []
3143 # mirror node verification
3144 if self.op.disk_template in constants.DTS_NET_MIRROR:
3145 if getattr(self.op, "snode", None) is None:
3146 raise errors.OpPrereqError("The networked disk templates need"
3149 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3150 if snode_name is None:
3151 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3153 elif snode_name == pnode.name:
3154 raise errors.OpPrereqError("The secondary node cannot be"
3155 " the primary node.")
3156 self.secondaries.append(snode_name)
3158 req_size = _ComputeDiskSize(self.op.disk_template,
3159 self.op.disk_size, self.op.swap_size)
3161 # Check lv size requirements
3162 if req_size is not None:
3163 nodenames = [pnode.name] + self.secondaries
3164 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3165 for node in nodenames:
3166 info = nodeinfo.get(node, None)
3168 raise errors.OpPrereqError("Cannot get current information"
3169 " from node '%s'" % node)
3170 vg_free = info.get('vg_free', None)
3171 if not isinstance(vg_free, int):
3172 raise errors.OpPrereqError("Can't compute free disk space on"
3174 if req_size > info['vg_free']:
3175 raise errors.OpPrereqError("Not enough disk space on target node %s."
3176 " %d MB available, %d MB required" %
3177 (node, info['vg_free'], req_size))
3180 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3182 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3183 " primary node" % self.op.os_type)
3185 if self.op.kernel_path == constants.VALUE_NONE:
3186 raise errors.OpPrereqError("Can't set instance kernel to none")
3189 # bridge check on primary node
3190 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3191 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3192 " destination node '%s'" %
3193 (self.op.bridge, pnode.name))
3195 # memory check on primary node
3197 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3198 "creating instance %s" % self.op.instance_name,
3201 # hvm_cdrom_image_path verification
3202 if self.op.hvm_cdrom_image_path is not None:
3203 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3204 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3205 " be an absolute path or None, not %s" %
3206 self.op.hvm_cdrom_image_path)
3207 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3208 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3209 " regular file or a symlink pointing to"
3210 " an existing regular file, not %s" %
3211 self.op.hvm_cdrom_image_path)
3213 # vnc_bind_address verification
3214 if self.op.vnc_bind_address is not None:
3215 if not utils.IsValidIP(self.op.vnc_bind_address):
3216 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3217 " like a valid IP address" %
3218 self.op.vnc_bind_address)
3221 self.instance_status = 'up'
3223 self.instance_status = 'down'
3225 def Exec(self, feedback_fn):
3226 """Create and add the instance to the cluster.
3229 instance = self.op.instance_name
3230 pnode_name = self.pnode.name
3232 if self.op.mac == "auto":
3233 mac_address = self.cfg.GenerateMAC()
3235 mac_address = self.op.mac
3237 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3238 if self.inst_ip is not None:
3239 nic.ip = self.inst_ip
3241 ht_kind = self.sstore.GetHypervisorType()
3242 if ht_kind in constants.HTS_REQ_PORT:
3243 network_port = self.cfg.AllocatePort()
3247 if self.op.vnc_bind_address is None:
3248 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3250 # this is needed because os.path.join does not accept None arguments
3251 if self.op.file_storage_dir is None:
3252 string_file_storage_dir = ""
3254 string_file_storage_dir = self.op.file_storage_dir
3256 # build the full file storage dir path
3257 file_storage_dir = os.path.normpath(os.path.join(
3258 self.sstore.GetFileStorageDir(),
3259 string_file_storage_dir, instance))
3262 disks = _GenerateDiskTemplate(self.cfg,
3263 self.op.disk_template,
3264 instance, pnode_name,
3265 self.secondaries, self.op.disk_size,
3268 self.op.file_driver)
3270 iobj = objects.Instance(name=instance, os=self.op.os_type,
3271 primary_node=pnode_name,
3272 memory=self.op.mem_size,
3273 vcpus=self.op.vcpus,
3274 nics=[nic], disks=disks,
3275 disk_template=self.op.disk_template,
3276 status=self.instance_status,
3277 network_port=network_port,
3278 kernel_path=self.op.kernel_path,
3279 initrd_path=self.op.initrd_path,
3280 hvm_boot_order=self.op.hvm_boot_order,
3281 hvm_acpi=self.op.hvm_acpi,
3282 hvm_pae=self.op.hvm_pae,
3283 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3284 vnc_bind_address=self.op.vnc_bind_address,
3287 feedback_fn("* creating instance disks...")
3288 if not _CreateDisks(self.cfg, iobj):
3289 _RemoveDisks(iobj, self.cfg)
3290 raise errors.OpExecError("Device creation failed, reverting...")
3292 feedback_fn("adding instance %s to cluster config" % instance)
3294 self.cfg.AddInstance(iobj)
3295 # Add the new instance to the Ganeti Lock Manager
3296 self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3298 if self.op.wait_for_sync:
3299 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3300 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3301 # make sure the disks are not degraded (still sync-ing is ok)
3303 feedback_fn("* checking mirrors status")
3304 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3309 _RemoveDisks(iobj, self.cfg)
3310 self.cfg.RemoveInstance(iobj.name)
3311 # Remove the new instance from the Ganeti Lock Manager
3312 self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3313 raise errors.OpExecError("There are some degraded disks for"
3316 feedback_fn("creating os for instance %s on node %s" %
3317 (instance, pnode_name))
3319 if iobj.disk_template != constants.DT_DISKLESS:
3320 if self.op.mode == constants.INSTANCE_CREATE:
3321 feedback_fn("* running the instance OS create scripts...")
3322 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3323 raise errors.OpExecError("could not add os for instance %s"
3325 (instance, pnode_name))
3327 elif self.op.mode == constants.INSTANCE_IMPORT:
3328 feedback_fn("* running the instance OS import scripts...")
3329 src_node = self.op.src_node
3330 src_image = self.src_image
3331 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3332 src_node, src_image):
3333 raise errors.OpExecError("Could not import os for instance"
3335 (instance, pnode_name))
3337 # also checked in the prereq part
3338 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3342 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3343 feedback_fn("* starting instance...")
3344 if not rpc.call_instance_start(pnode_name, iobj, None):
3345 raise errors.OpExecError("Could not start instance")
3348 class LUConnectConsole(NoHooksLU):
3349 """Connect to an instance's console.
3351 This is somewhat special in that it returns the command line that
3352 you need to run on the master node in order to connect to the
3356 _OP_REQP = ["instance_name"]
3359 def ExpandNames(self):
3360 self._ExpandAndLockInstance()
3362 def CheckPrereq(self):
3363 """Check prerequisites.
3365 This checks that the instance is in the cluster.
3368 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3369 assert self.instance is not None, \
3370 "Cannot retrieve locked instance %s" % self.op.instance_name
3372 def Exec(self, feedback_fn):
3373 """Connect to the console of an instance
3376 instance = self.instance
3377 node = instance.primary_node
3379 node_insts = rpc.call_instance_list([node])[node]
3380 if node_insts is False:
3381 raise errors.OpExecError("Can't connect to node %s." % node)
3383 if instance.name not in node_insts:
3384 raise errors.OpExecError("Instance %s is not running." % instance.name)
3386 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3388 hyper = hypervisor.GetHypervisor()
3389 console_cmd = hyper.GetShellCommandForConsole(instance)
3392 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3395 class LUReplaceDisks(LogicalUnit):
3396 """Replace the disks of an instance.
3399 HPATH = "mirrors-replace"
3400 HTYPE = constants.HTYPE_INSTANCE
3401 _OP_REQP = ["instance_name", "mode", "disks"]
3403 def _RunAllocator(self):
3404 """Compute a new secondary node using an IAllocator.
3407 ial = IAllocator(self.cfg, self.sstore,
3408 mode=constants.IALLOCATOR_MODE_RELOC,
3409 name=self.op.instance_name,
3410 relocate_from=[self.sec_node])
3412 ial.Run(self.op.iallocator)
3415 raise errors.OpPrereqError("Can't compute nodes using"
3416 " iallocator '%s': %s" % (self.op.iallocator,
3418 if len(ial.nodes) != ial.required_nodes:
3419 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3420 " of nodes (%s), required %s" %
3421 (len(ial.nodes), ial.required_nodes))
3422 self.op.remote_node = ial.nodes[0]
3423 logger.ToStdout("Selected new secondary for the instance: %s" %
3424 self.op.remote_node)
3426 def BuildHooksEnv(self):
3429 This runs on the master, the primary and all the secondaries.
3433 "MODE": self.op.mode,
3434 "NEW_SECONDARY": self.op.remote_node,
3435 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3437 env.update(_BuildInstanceHookEnvByObject(self.instance))
3439 self.sstore.GetMasterNode(),
3440 self.instance.primary_node,
3442 if self.op.remote_node is not None:
3443 nl.append(self.op.remote_node)
3446 def CheckPrereq(self):
3447 """Check prerequisites.
3449 This checks that the instance is in the cluster.
3452 if not hasattr(self.op, "remote_node"):
3453 self.op.remote_node = None
3455 instance = self.cfg.GetInstanceInfo(
3456 self.cfg.ExpandInstanceName(self.op.instance_name))
3457 if instance is None:
3458 raise errors.OpPrereqError("Instance '%s' not known" %
3459 self.op.instance_name)
3460 self.instance = instance
3461 self.op.instance_name = instance.name
3463 if instance.disk_template not in constants.DTS_NET_MIRROR:
3464 raise errors.OpPrereqError("Instance's disk layout is not"
3465 " network mirrored.")
3467 if len(instance.secondary_nodes) != 1:
3468 raise errors.OpPrereqError("The instance has a strange layout,"
3469 " expected one secondary but found %d" %
3470 len(instance.secondary_nodes))
3472 self.sec_node = instance.secondary_nodes[0]
3474 ia_name = getattr(self.op, "iallocator", None)
3475 if ia_name is not None:
3476 if self.op.remote_node is not None:
3477 raise errors.OpPrereqError("Give either the iallocator or the new"
3478 " secondary, not both")
3479 self.op.remote_node = self._RunAllocator()
3481 remote_node = self.op.remote_node
3482 if remote_node is not None:
3483 remote_node = self.cfg.ExpandNodeName(remote_node)
3484 if remote_node is None:
3485 raise errors.OpPrereqError("Node '%s' not known" %
3486 self.op.remote_node)
3487 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3489 self.remote_node_info = None
3490 if remote_node == instance.primary_node:
3491 raise errors.OpPrereqError("The specified node is the primary node of"
3493 elif remote_node == self.sec_node:
3494 if self.op.mode == constants.REPLACE_DISK_SEC:
3495 # this is for DRBD8, where we can't execute the same mode of
3496 # replacement as for drbd7 (no different port allocated)
3497 raise errors.OpPrereqError("Same secondary given, cannot execute"
3499 if instance.disk_template == constants.DT_DRBD8:
3500 if (self.op.mode == constants.REPLACE_DISK_ALL and
3501 remote_node is not None):
3502 # switch to replace secondary mode
3503 self.op.mode = constants.REPLACE_DISK_SEC
3505 if self.op.mode == constants.REPLACE_DISK_ALL:
3506 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3507 " secondary disk replacement, not"
3509 elif self.op.mode == constants.REPLACE_DISK_PRI:
3510 if remote_node is not None:
3511 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3512 " the secondary while doing a primary"
3513 " node disk replacement")
3514 self.tgt_node = instance.primary_node
3515 self.oth_node = instance.secondary_nodes[0]
3516 elif self.op.mode == constants.REPLACE_DISK_SEC:
3517 self.new_node = remote_node # this can be None, in which case
3518 # we don't change the secondary
3519 self.tgt_node = instance.secondary_nodes[0]
3520 self.oth_node = instance.primary_node
3522 raise errors.ProgrammerError("Unhandled disk replace mode")
3524 for name in self.op.disks:
3525 if instance.FindDisk(name) is None:
3526 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3527 (name, instance.name))
3528 self.op.remote_node = remote_node
3530 def _ExecD8DiskOnly(self, feedback_fn):
3531 """Replace a disk on the primary or secondary for dbrd8.
3533 The algorithm for replace is quite complicated:
3534 - for each disk to be replaced:
3535 - create new LVs on the target node with unique names
3536 - detach old LVs from the drbd device
3537 - rename old LVs to name_replaced.<time_t>
3538 - rename new LVs to old LVs
3539 - attach the new LVs (with the old names now) to the drbd device
3540 - wait for sync across all devices
3541 - for each modified disk:
3542 - remove old LVs (which have the name name_replaces.<time_t>)
3544 Failures are not very well handled.
3548 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3549 instance = self.instance
3551 vgname = self.cfg.GetVGName()
3554 tgt_node = self.tgt_node
3555 oth_node = self.oth_node
3557 # Step: check device activation
3558 self.proc.LogStep(1, steps_total, "check device existence")
3559 info("checking volume groups")
3560 my_vg = cfg.GetVGName()
3561 results = rpc.call_vg_list([oth_node, tgt_node])
3563 raise errors.OpExecError("Can't list volume groups on the nodes")
3564 for node in oth_node, tgt_node:
3565 res = results.get(node, False)
3566 if not res or my_vg not in res:
3567 raise errors.OpExecError("Volume group '%s' not found on %s" %
3569 for dev in instance.disks:
3570 if not dev.iv_name in self.op.disks:
3572 for node in tgt_node, oth_node:
3573 info("checking %s on %s" % (dev.iv_name, node))
3574 cfg.SetDiskID(dev, node)
3575 if not rpc.call_blockdev_find(node, dev):
3576 raise errors.OpExecError("Can't find device %s on node %s" %
3577 (dev.iv_name, node))
3579 # Step: check other node consistency
3580 self.proc.LogStep(2, steps_total, "check peer consistency")
3581 for dev in instance.disks:
3582 if not dev.iv_name in self.op.disks:
3584 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3585 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3586 oth_node==instance.primary_node):
3587 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3588 " to replace disks on this node (%s)" %
3589 (oth_node, tgt_node))
3591 # Step: create new storage
3592 self.proc.LogStep(3, steps_total, "allocate new storage")
3593 for dev in instance.disks:
3594 if not dev.iv_name in self.op.disks:
3597 cfg.SetDiskID(dev, tgt_node)
3598 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3599 names = _GenerateUniqueNames(cfg, lv_names)
3600 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3601 logical_id=(vgname, names[0]))
3602 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3603 logical_id=(vgname, names[1]))
3604 new_lvs = [lv_data, lv_meta]
3605 old_lvs = dev.children
3606 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3607 info("creating new local storage on %s for %s" %
3608 (tgt_node, dev.iv_name))
3609 # since we *always* want to create this LV, we use the
3610 # _Create...OnPrimary (which forces the creation), even if we
3611 # are talking about the secondary node
3612 for new_lv in new_lvs:
3613 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3614 _GetInstanceInfoText(instance)):
3615 raise errors.OpExecError("Failed to create new LV named '%s' on"
3617 (new_lv.logical_id[1], tgt_node))
3619 # Step: for each lv, detach+rename*2+attach
3620 self.proc.LogStep(4, steps_total, "change drbd configuration")
3621 for dev, old_lvs, new_lvs in iv_names.itervalues():
3622 info("detaching %s drbd from local storage" % dev.iv_name)
3623 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3624 raise errors.OpExecError("Can't detach drbd from local storage on node"
3625 " %s for device %s" % (tgt_node, dev.iv_name))
3627 #cfg.Update(instance)
3629 # ok, we created the new LVs, so now we know we have the needed
3630 # storage; as such, we proceed on the target node to rename
3631 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3632 # using the assumption that logical_id == physical_id (which in
3633 # turn is the unique_id on that node)
3635 # FIXME(iustin): use a better name for the replaced LVs
3636 temp_suffix = int(time.time())
3637 ren_fn = lambda d, suff: (d.physical_id[0],
3638 d.physical_id[1] + "_replaced-%s" % suff)
3639 # build the rename list based on what LVs exist on the node
3641 for to_ren in old_lvs:
3642 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3643 if find_res is not None: # device exists
3644 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3646 info("renaming the old LVs on the target node")
3647 if not rpc.call_blockdev_rename(tgt_node, rlist):
3648 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3649 # now we rename the new LVs to the old LVs
3650 info("renaming the new LVs on the target node")
3651 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3652 if not rpc.call_blockdev_rename(tgt_node, rlist):
3653 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3655 for old, new in zip(old_lvs, new_lvs):
3656 new.logical_id = old.logical_id
3657 cfg.SetDiskID(new, tgt_node)
3659 for disk in old_lvs:
3660 disk.logical_id = ren_fn(disk, temp_suffix)
3661 cfg.SetDiskID(disk, tgt_node)
3663 # now that the new lvs have the old name, we can add them to the device
3664 info("adding new mirror component on %s" % tgt_node)
3665 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3666 for new_lv in new_lvs:
3667 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3668 warning("Can't rollback device %s", hint="manually cleanup unused"
3670 raise errors.OpExecError("Can't add local storage to drbd")
3672 dev.children = new_lvs
3673 cfg.Update(instance)
3675 # Step: wait for sync
3677 # this can fail as the old devices are degraded and _WaitForSync
3678 # does a combined result over all disks, so we don't check its
3680 self.proc.LogStep(5, steps_total, "sync devices")
3681 _WaitForSync(cfg, instance, self.proc, unlock=True)
3683 # so check manually all the devices
3684 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3685 cfg.SetDiskID(dev, instance.primary_node)
3686 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3688 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3690 # Step: remove old storage
3691 self.proc.LogStep(6, steps_total, "removing old storage")
3692 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3693 info("remove logical volumes for %s" % name)
3695 cfg.SetDiskID(lv, tgt_node)
3696 if not rpc.call_blockdev_remove(tgt_node, lv):
3697 warning("Can't remove old LV", hint="manually remove unused LVs")
3700 def _ExecD8Secondary(self, feedback_fn):
3701 """Replace the secondary node for drbd8.
3703 The algorithm for replace is quite complicated:
3704 - for all disks of the instance:
3705 - create new LVs on the new node with same names
3706 - shutdown the drbd device on the old secondary
3707 - disconnect the drbd network on the primary
3708 - create the drbd device on the new secondary
3709 - network attach the drbd on the primary, using an artifice:
3710 the drbd code for Attach() will connect to the network if it
3711 finds a device which is connected to the good local disks but
3713 - wait for sync across all devices
3714 - remove all disks from the old secondary
3716 Failures are not very well handled.
3720 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3721 instance = self.instance
3723 vgname = self.cfg.GetVGName()
3726 old_node = self.tgt_node
3727 new_node = self.new_node
3728 pri_node = instance.primary_node
3730 # Step: check device activation
3731 self.proc.LogStep(1, steps_total, "check device existence")
3732 info("checking volume groups")
3733 my_vg = cfg.GetVGName()
3734 results = rpc.call_vg_list([pri_node, new_node])
3736 raise errors.OpExecError("Can't list volume groups on the nodes")
3737 for node in pri_node, new_node:
3738 res = results.get(node, False)
3739 if not res or my_vg not in res:
3740 raise errors.OpExecError("Volume group '%s' not found on %s" %
3742 for dev in instance.disks:
3743 if not dev.iv_name in self.op.disks:
3745 info("checking %s on %s" % (dev.iv_name, pri_node))
3746 cfg.SetDiskID(dev, pri_node)
3747 if not rpc.call_blockdev_find(pri_node, dev):
3748 raise errors.OpExecError("Can't find device %s on node %s" %
3749 (dev.iv_name, pri_node))
3751 # Step: check other node consistency
3752 self.proc.LogStep(2, steps_total, "check peer consistency")
3753 for dev in instance.disks:
3754 if not dev.iv_name in self.op.disks:
3756 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3757 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3758 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3759 " unsafe to replace the secondary" %
3762 # Step: create new storage
3763 self.proc.LogStep(3, steps_total, "allocate new storage")
3764 for dev in instance.disks:
3766 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3767 # since we *always* want to create this LV, we use the
3768 # _Create...OnPrimary (which forces the creation), even if we
3769 # are talking about the secondary node
3770 for new_lv in dev.children:
3771 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3772 _GetInstanceInfoText(instance)):
3773 raise errors.OpExecError("Failed to create new LV named '%s' on"
3775 (new_lv.logical_id[1], new_node))
3777 iv_names[dev.iv_name] = (dev, dev.children)
3779 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3780 for dev in instance.disks:
3782 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3783 # create new devices on new_node
3784 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3785 logical_id=(pri_node, new_node,
3787 children=dev.children)
3788 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3790 _GetInstanceInfoText(instance)):
3791 raise errors.OpExecError("Failed to create new DRBD on"
3792 " node '%s'" % new_node)
3794 for dev in instance.disks:
3795 # we have new devices, shutdown the drbd on the old secondary
3796 info("shutting down drbd for %s on old node" % dev.iv_name)
3797 cfg.SetDiskID(dev, old_node)
3798 if not rpc.call_blockdev_shutdown(old_node, dev):
3799 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3800 hint="Please cleanup this device manually as soon as possible")
3802 info("detaching primary drbds from the network (=> standalone)")
3804 for dev in instance.disks:
3805 cfg.SetDiskID(dev, pri_node)
3806 # set the physical (unique in bdev terms) id to None, meaning
3807 # detach from network
3808 dev.physical_id = (None,) * len(dev.physical_id)
3809 # and 'find' the device, which will 'fix' it to match the
3811 if rpc.call_blockdev_find(pri_node, dev):
3814 warning("Failed to detach drbd %s from network, unusual case" %
3818 # no detaches succeeded (very unlikely)
3819 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3821 # if we managed to detach at least one, we update all the disks of
3822 # the instance to point to the new secondary
3823 info("updating instance configuration")
3824 for dev in instance.disks:
3825 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3826 cfg.SetDiskID(dev, pri_node)
3827 cfg.Update(instance)
3829 # and now perform the drbd attach
3830 info("attaching primary drbds to new secondary (standalone => connected)")
3832 for dev in instance.disks:
3833 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3834 # since the attach is smart, it's enough to 'find' the device,
3835 # it will automatically activate the network, if the physical_id
3837 cfg.SetDiskID(dev, pri_node)
3838 if not rpc.call_blockdev_find(pri_node, dev):
3839 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3840 "please do a gnt-instance info to see the status of disks")
3842 # this can fail as the old devices are degraded and _WaitForSync
3843 # does a combined result over all disks, so we don't check its
3845 self.proc.LogStep(5, steps_total, "sync devices")
3846 _WaitForSync(cfg, instance, self.proc, unlock=True)
3848 # so check manually all the devices
3849 for name, (dev, old_lvs) in iv_names.iteritems():
3850 cfg.SetDiskID(dev, pri_node)
3851 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3853 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3855 self.proc.LogStep(6, steps_total, "removing old storage")
3856 for name, (dev, old_lvs) in iv_names.iteritems():
3857 info("remove logical volumes for %s" % name)
3859 cfg.SetDiskID(lv, old_node)
3860 if not rpc.call_blockdev_remove(old_node, lv):
3861 warning("Can't remove LV on old secondary",
3862 hint="Cleanup stale volumes by hand")
3864 def Exec(self, feedback_fn):
3865 """Execute disk replacement.
3867 This dispatches the disk replacement to the appropriate handler.
3870 instance = self.instance
3872 # Activate the instance disks if we're replacing them on a down instance
3873 if instance.status == "down":
3874 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3875 self.proc.ChainOpCode(op)
3877 if instance.disk_template == constants.DT_DRBD8:
3878 if self.op.remote_node is None:
3879 fn = self._ExecD8DiskOnly
3881 fn = self._ExecD8Secondary
3883 raise errors.ProgrammerError("Unhandled disk replacement case")
3885 ret = fn(feedback_fn)
3887 # Deactivate the instance disks if we're replacing them on a down instance
3888 if instance.status == "down":
3889 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3890 self.proc.ChainOpCode(op)
3895 class LUGrowDisk(LogicalUnit):
3896 """Grow a disk of an instance.
3900 HTYPE = constants.HTYPE_INSTANCE
3901 _OP_REQP = ["instance_name", "disk", "amount"]
3903 def BuildHooksEnv(self):
3906 This runs on the master, the primary and all the secondaries.
3910 "DISK": self.op.disk,
3911 "AMOUNT": self.op.amount,
3913 env.update(_BuildInstanceHookEnvByObject(self.instance))
3915 self.sstore.GetMasterNode(),
3916 self.instance.primary_node,
3920 def CheckPrereq(self):
3921 """Check prerequisites.
3923 This checks that the instance is in the cluster.
3926 instance = self.cfg.GetInstanceInfo(
3927 self.cfg.ExpandInstanceName(self.op.instance_name))
3928 if instance is None:
3929 raise errors.OpPrereqError("Instance '%s' not known" %
3930 self.op.instance_name)
3931 self.instance = instance
3932 self.op.instance_name = instance.name
3934 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3935 raise errors.OpPrereqError("Instance's disk layout does not support"
3938 if instance.FindDisk(self.op.disk) is None:
3939 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3940 (self.op.disk, instance.name))
3942 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3943 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3944 for node in nodenames:
3945 info = nodeinfo.get(node, None)
3947 raise errors.OpPrereqError("Cannot get current information"
3948 " from node '%s'" % node)
3949 vg_free = info.get('vg_free', None)
3950 if not isinstance(vg_free, int):
3951 raise errors.OpPrereqError("Can't compute free disk space on"
3953 if self.op.amount > info['vg_free']:
3954 raise errors.OpPrereqError("Not enough disk space on target node %s:"
3955 " %d MiB available, %d MiB required" %
3956 (node, info['vg_free'], self.op.amount))
3958 def Exec(self, feedback_fn):
3959 """Execute disk grow.
3962 instance = self.instance
3963 disk = instance.FindDisk(self.op.disk)
3964 for node in (instance.secondary_nodes + (instance.primary_node,)):
3965 self.cfg.SetDiskID(disk, node)
3966 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3967 if not result or not isinstance(result, tuple) or len(result) != 2:
3968 raise errors.OpExecError("grow request failed to node %s" % node)
3970 raise errors.OpExecError("grow request failed to node %s: %s" %
3972 disk.RecordGrow(self.op.amount)
3973 self.cfg.Update(instance)
3977 class LUQueryInstanceData(NoHooksLU):
3978 """Query runtime instance data.
3981 _OP_REQP = ["instances"]
3983 def CheckPrereq(self):
3984 """Check prerequisites.
3986 This only checks the optional instance list against the existing names.
3989 if not isinstance(self.op.instances, list):
3990 raise errors.OpPrereqError("Invalid argument type 'instances'")
3991 if self.op.instances:
3992 self.wanted_instances = []
3993 names = self.op.instances
3995 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
3996 if instance is None:
3997 raise errors.OpPrereqError("No such instance name '%s'" % name)
3998 self.wanted_instances.append(instance)
4000 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4001 in self.cfg.GetInstanceList()]
4005 def _ComputeDiskStatus(self, instance, snode, dev):
4006 """Compute block device status.
4009 self.cfg.SetDiskID(dev, instance.primary_node)
4010 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4011 if dev.dev_type in constants.LDS_DRBD:
4012 # we change the snode then (otherwise we use the one passed in)
4013 if dev.logical_id[0] == instance.primary_node:
4014 snode = dev.logical_id[1]
4016 snode = dev.logical_id[0]
4019 self.cfg.SetDiskID(dev, snode)
4020 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4025 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4026 for child in dev.children]
4031 "iv_name": dev.iv_name,
4032 "dev_type": dev.dev_type,
4033 "logical_id": dev.logical_id,
4034 "physical_id": dev.physical_id,
4035 "pstatus": dev_pstatus,
4036 "sstatus": dev_sstatus,
4037 "children": dev_children,
4042 def Exec(self, feedback_fn):
4043 """Gather and return data"""
4045 for instance in self.wanted_instances:
4046 remote_info = rpc.call_instance_info(instance.primary_node,
4048 if remote_info and "state" in remote_info:
4051 remote_state = "down"
4052 if instance.status == "down":
4053 config_state = "down"
4057 disks = [self._ComputeDiskStatus(instance, None, device)
4058 for device in instance.disks]
4061 "name": instance.name,
4062 "config_state": config_state,
4063 "run_state": remote_state,
4064 "pnode": instance.primary_node,
4065 "snodes": instance.secondary_nodes,
4067 "memory": instance.memory,
4068 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4070 "vcpus": instance.vcpus,
4073 htkind = self.sstore.GetHypervisorType()
4074 if htkind == constants.HT_XEN_PVM30:
4075 idict["kernel_path"] = instance.kernel_path
4076 idict["initrd_path"] = instance.initrd_path
4078 if htkind == constants.HT_XEN_HVM31:
4079 idict["hvm_boot_order"] = instance.hvm_boot_order
4080 idict["hvm_acpi"] = instance.hvm_acpi
4081 idict["hvm_pae"] = instance.hvm_pae
4082 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4084 if htkind in constants.HTS_REQ_PORT:
4085 idict["vnc_bind_address"] = instance.vnc_bind_address
4086 idict["network_port"] = instance.network_port
4088 result[instance.name] = idict
4093 class LUSetInstanceParams(LogicalUnit):
4094 """Modifies an instances's parameters.
4097 HPATH = "instance-modify"
4098 HTYPE = constants.HTYPE_INSTANCE
4099 _OP_REQP = ["instance_name"]
4101 def BuildHooksEnv(self):
4104 This runs on the master, primary and secondaries.
4109 args['memory'] = self.mem
4111 args['vcpus'] = self.vcpus
4112 if self.do_ip or self.do_bridge or self.mac:
4116 ip = self.instance.nics[0].ip
4118 bridge = self.bridge
4120 bridge = self.instance.nics[0].bridge
4124 mac = self.instance.nics[0].mac
4125 args['nics'] = [(ip, bridge, mac)]
4126 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4127 nl = [self.sstore.GetMasterNode(),
4128 self.instance.primary_node] + list(self.instance.secondary_nodes)
4131 def CheckPrereq(self):
4132 """Check prerequisites.
4134 This only checks the instance list against the existing names.
4137 self.mem = getattr(self.op, "mem", None)
4138 self.vcpus = getattr(self.op, "vcpus", None)
4139 self.ip = getattr(self.op, "ip", None)
4140 self.mac = getattr(self.op, "mac", None)
4141 self.bridge = getattr(self.op, "bridge", None)
4142 self.kernel_path = getattr(self.op, "kernel_path", None)
4143 self.initrd_path = getattr(self.op, "initrd_path", None)
4144 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4145 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4146 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4147 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4148 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4149 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4150 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4151 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4152 self.vnc_bind_address]
4153 if all_parms.count(None) == len(all_parms):
4154 raise errors.OpPrereqError("No changes submitted")
4155 if self.mem is not None:
4157 self.mem = int(self.mem)
4158 except ValueError, err:
4159 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4160 if self.vcpus is not None:
4162 self.vcpus = int(self.vcpus)
4163 except ValueError, err:
4164 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4165 if self.ip is not None:
4167 if self.ip.lower() == "none":
4170 if not utils.IsValidIP(self.ip):
4171 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4174 self.do_bridge = (self.bridge is not None)
4175 if self.mac is not None:
4176 if self.cfg.IsMacInUse(self.mac):
4177 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4179 if not utils.IsValidMac(self.mac):
4180 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4182 if self.kernel_path is not None:
4183 self.do_kernel_path = True
4184 if self.kernel_path == constants.VALUE_NONE:
4185 raise errors.OpPrereqError("Can't set instance to no kernel")
4187 if self.kernel_path != constants.VALUE_DEFAULT:
4188 if not os.path.isabs(self.kernel_path):
4189 raise errors.OpPrereqError("The kernel path must be an absolute"
4192 self.do_kernel_path = False
4194 if self.initrd_path is not None:
4195 self.do_initrd_path = True
4196 if self.initrd_path not in (constants.VALUE_NONE,
4197 constants.VALUE_DEFAULT):
4198 if not os.path.isabs(self.initrd_path):
4199 raise errors.OpPrereqError("The initrd path must be an absolute"
4202 self.do_initrd_path = False
4204 # boot order verification
4205 if self.hvm_boot_order is not None:
4206 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4207 if len(self.hvm_boot_order.strip("acdn")) != 0:
4208 raise errors.OpPrereqError("invalid boot order specified,"
4209 " must be one or more of [acdn]"
4212 # hvm_cdrom_image_path verification
4213 if self.op.hvm_cdrom_image_path is not None:
4214 if not os.path.isabs(self.op.hvm_cdrom_image_path):
4215 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4216 " be an absolute path or None, not %s" %
4217 self.op.hvm_cdrom_image_path)
4218 if not os.path.isfile(self.op.hvm_cdrom_image_path):
4219 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4220 " regular file or a symlink pointing to"
4221 " an existing regular file, not %s" %
4222 self.op.hvm_cdrom_image_path)
4224 # vnc_bind_address verification
4225 if self.op.vnc_bind_address is not None:
4226 if not utils.IsValidIP(self.op.vnc_bind_address):
4227 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4228 " like a valid IP address" %
4229 self.op.vnc_bind_address)
4231 instance = self.cfg.GetInstanceInfo(
4232 self.cfg.ExpandInstanceName(self.op.instance_name))
4233 if instance is None:
4234 raise errors.OpPrereqError("No such instance name '%s'" %
4235 self.op.instance_name)
4236 self.op.instance_name = instance.name
4237 self.instance = instance
4240 def Exec(self, feedback_fn):
4241 """Modifies an instance.
4243 All parameters take effect only at the next restart of the instance.
4246 instance = self.instance
4248 instance.memory = self.mem
4249 result.append(("mem", self.mem))
4251 instance.vcpus = self.vcpus
4252 result.append(("vcpus", self.vcpus))
4254 instance.nics[0].ip = self.ip
4255 result.append(("ip", self.ip))
4257 instance.nics[0].bridge = self.bridge
4258 result.append(("bridge", self.bridge))
4260 instance.nics[0].mac = self.mac
4261 result.append(("mac", self.mac))
4262 if self.do_kernel_path:
4263 instance.kernel_path = self.kernel_path
4264 result.append(("kernel_path", self.kernel_path))
4265 if self.do_initrd_path:
4266 instance.initrd_path = self.initrd_path
4267 result.append(("initrd_path", self.initrd_path))
4268 if self.hvm_boot_order:
4269 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4270 instance.hvm_boot_order = None
4272 instance.hvm_boot_order = self.hvm_boot_order
4273 result.append(("hvm_boot_order", self.hvm_boot_order))
4275 instance.hvm_acpi = self.hvm_acpi
4276 result.append(("hvm_acpi", self.hvm_acpi))
4278 instance.hvm_pae = self.hvm_pae
4279 result.append(("hvm_pae", self.hvm_pae))
4280 if self.hvm_cdrom_image_path:
4281 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4282 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4283 if self.vnc_bind_address:
4284 instance.vnc_bind_address = self.vnc_bind_address
4285 result.append(("vnc_bind_address", self.vnc_bind_address))
4287 self.cfg.Update(instance)
4292 class LUQueryExports(NoHooksLU):
4293 """Query the exports list
4298 def CheckPrereq(self):
4299 """Check that the nodelist contains only existing nodes.
4302 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4304 def Exec(self, feedback_fn):
4305 """Compute the list of all the exported system images.
4308 a dictionary with the structure node->(export-list)
4309 where export-list is a list of the instances exported on
4313 return rpc.call_export_list(self.nodes)
4316 class LUExportInstance(LogicalUnit):
4317 """Export an instance to an image in the cluster.
4320 HPATH = "instance-export"
4321 HTYPE = constants.HTYPE_INSTANCE
4322 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4324 def BuildHooksEnv(self):
4327 This will run on the master, primary node and target node.
4331 "EXPORT_NODE": self.op.target_node,
4332 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4334 env.update(_BuildInstanceHookEnvByObject(self.instance))
4335 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4336 self.op.target_node]
4339 def CheckPrereq(self):
4340 """Check prerequisites.
4342 This checks that the instance and node names are valid.
4345 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4346 self.instance = self.cfg.GetInstanceInfo(instance_name)
4347 if self.instance is None:
4348 raise errors.OpPrereqError("Instance '%s' not found" %
4349 self.op.instance_name)
4352 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4353 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4355 if self.dst_node is None:
4356 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4357 self.op.target_node)
4358 self.op.target_node = self.dst_node.name
4360 # instance disk type verification
4361 for disk in self.instance.disks:
4362 if disk.dev_type == constants.LD_FILE:
4363 raise errors.OpPrereqError("Export not supported for instances with"
4364 " file-based disks")
4366 def Exec(self, feedback_fn):
4367 """Export an instance to an image in the cluster.
4370 instance = self.instance
4371 dst_node = self.dst_node
4372 src_node = instance.primary_node
4373 if self.op.shutdown:
4374 # shutdown the instance, but not the disks
4375 if not rpc.call_instance_shutdown(src_node, instance):
4376 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4377 (instance.name, src_node))
4379 vgname = self.cfg.GetVGName()
4384 for disk in instance.disks:
4385 if disk.iv_name == "sda":
4386 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4387 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4389 if not new_dev_name:
4390 logger.Error("could not snapshot block device %s on node %s" %
4391 (disk.logical_id[1], src_node))
4393 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4394 logical_id=(vgname, new_dev_name),
4395 physical_id=(vgname, new_dev_name),
4396 iv_name=disk.iv_name)
4397 snap_disks.append(new_dev)
4400 if self.op.shutdown and instance.status == "up":
4401 if not rpc.call_instance_start(src_node, instance, None):
4402 _ShutdownInstanceDisks(instance, self.cfg)
4403 raise errors.OpExecError("Could not start instance")
4405 # TODO: check for size
4407 for dev in snap_disks:
4408 if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4409 logger.Error("could not export block device %s from node %s to node %s"
4410 % (dev.logical_id[1], src_node, dst_node.name))
4411 if not rpc.call_blockdev_remove(src_node, dev):
4412 logger.Error("could not remove snapshot block device %s from node %s" %
4413 (dev.logical_id[1], src_node))
4415 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4416 logger.Error("could not finalize export for instance %s on node %s" %
4417 (instance.name, dst_node.name))
4419 nodelist = self.cfg.GetNodeList()
4420 nodelist.remove(dst_node.name)
4422 # on one-node clusters nodelist will be empty after the removal
4423 # if we proceed the backup would be removed because OpQueryExports
4424 # substitutes an empty list with the full cluster node list.
4426 op = opcodes.OpQueryExports(nodes=nodelist)
4427 exportlist = self.proc.ChainOpCode(op)
4428 for node in exportlist:
4429 if instance.name in exportlist[node]:
4430 if not rpc.call_export_remove(node, instance.name):
4431 logger.Error("could not remove older export for instance %s"
4432 " on node %s" % (instance.name, node))
4435 class LURemoveExport(NoHooksLU):
4436 """Remove exports related to the named instance.
4439 _OP_REQP = ["instance_name"]
4441 def CheckPrereq(self):
4442 """Check prerequisites.
4446 def Exec(self, feedback_fn):
4447 """Remove any export.
4450 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4451 # If the instance was not found we'll try with the name that was passed in.
4452 # This will only work if it was an FQDN, though.
4454 if not instance_name:
4456 instance_name = self.op.instance_name
4458 op = opcodes.OpQueryExports(nodes=[])
4459 exportlist = self.proc.ChainOpCode(op)
4461 for node in exportlist:
4462 if instance_name in exportlist[node]:
4464 if not rpc.call_export_remove(node, instance_name):
4465 logger.Error("could not remove export for instance %s"
4466 " on node %s" % (instance_name, node))
4468 if fqdn_warn and not found:
4469 feedback_fn("Export not found. If trying to remove an export belonging"
4470 " to a deleted instance please use its Fully Qualified"
4474 class TagsLU(NoHooksLU):
4477 This is an abstract class which is the parent of all the other tags LUs.
4480 def CheckPrereq(self):
4481 """Check prerequisites.
4484 if self.op.kind == constants.TAG_CLUSTER:
4485 self.target = self.cfg.GetClusterInfo()
4486 elif self.op.kind == constants.TAG_NODE:
4487 name = self.cfg.ExpandNodeName(self.op.name)
4489 raise errors.OpPrereqError("Invalid node name (%s)" %
4492 self.target = self.cfg.GetNodeInfo(name)
4493 elif self.op.kind == constants.TAG_INSTANCE:
4494 name = self.cfg.ExpandInstanceName(self.op.name)
4496 raise errors.OpPrereqError("Invalid instance name (%s)" %
4499 self.target = self.cfg.GetInstanceInfo(name)
4501 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4505 class LUGetTags(TagsLU):
4506 """Returns the tags of a given object.
4509 _OP_REQP = ["kind", "name"]
4511 def Exec(self, feedback_fn):
4512 """Returns the tag list.
4515 return self.target.GetTags()
4518 class LUSearchTags(NoHooksLU):
4519 """Searches the tags for a given pattern.
4522 _OP_REQP = ["pattern"]
4524 def CheckPrereq(self):
4525 """Check prerequisites.
4527 This checks the pattern passed for validity by compiling it.
4531 self.re = re.compile(self.op.pattern)
4532 except re.error, err:
4533 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4534 (self.op.pattern, err))
4536 def Exec(self, feedback_fn):
4537 """Returns the tag list.
4541 tgts = [("/cluster", cfg.GetClusterInfo())]
4542 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4543 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4544 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4545 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4547 for path, target in tgts:
4548 for tag in target.GetTags():
4549 if self.re.search(tag):
4550 results.append((path, tag))
4554 class LUAddTags(TagsLU):
4555 """Sets a tag on a given object.
4558 _OP_REQP = ["kind", "name", "tags"]
4560 def CheckPrereq(self):
4561 """Check prerequisites.
4563 This checks the type and length of the tag name and value.
4566 TagsLU.CheckPrereq(self)
4567 for tag in self.op.tags:
4568 objects.TaggableObject.ValidateTag(tag)
4570 def Exec(self, feedback_fn):
4575 for tag in self.op.tags:
4576 self.target.AddTag(tag)
4577 except errors.TagError, err:
4578 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4580 self.cfg.Update(self.target)
4581 except errors.ConfigurationError:
4582 raise errors.OpRetryError("There has been a modification to the"
4583 " config file and the operation has been"
4584 " aborted. Please retry.")
4587 class LUDelTags(TagsLU):
4588 """Delete a list of tags from a given object.
4591 _OP_REQP = ["kind", "name", "tags"]
4593 def CheckPrereq(self):
4594 """Check prerequisites.
4596 This checks that we have the given tag.
4599 TagsLU.CheckPrereq(self)
4600 for tag in self.op.tags:
4601 objects.TaggableObject.ValidateTag(tag)
4602 del_tags = frozenset(self.op.tags)
4603 cur_tags = self.target.GetTags()
4604 if not del_tags <= cur_tags:
4605 diff_tags = del_tags - cur_tags
4606 diff_names = ["'%s'" % tag for tag in diff_tags]
4608 raise errors.OpPrereqError("Tag(s) %s not found" %
4609 (",".join(diff_names)))
4611 def Exec(self, feedback_fn):
4612 """Remove the tag from the object.
4615 for tag in self.op.tags:
4616 self.target.RemoveTag(tag)
4618 self.cfg.Update(self.target)
4619 except errors.ConfigurationError:
4620 raise errors.OpRetryError("There has been a modification to the"
4621 " config file and the operation has been"
4622 " aborted. Please retry.")
4625 class LUTestDelay(NoHooksLU):
4626 """Sleep for a specified amount of time.
4628 This LU sleeps on the master and/or nodes for a specified amount of
4632 _OP_REQP = ["duration", "on_master", "on_nodes"]
4635 def ExpandNames(self):
4636 """Expand names and set required locks.
4638 This expands the node list, if any.
4641 self.needed_locks = {}
4642 if self.op.on_nodes:
4643 # _GetWantedNodes can be used here, but is not always appropriate to use
4644 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4646 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4647 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4649 def CheckPrereq(self):
4650 """Check prerequisites.
4654 def Exec(self, feedback_fn):
4655 """Do the actual sleep.
4658 if self.op.on_master:
4659 if not utils.TestDelay(self.op.duration):
4660 raise errors.OpExecError("Error during master delay test")
4661 if self.op.on_nodes:
4662 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4664 raise errors.OpExecError("Complete failure from rpc call")
4665 for node, node_result in result.items():
4667 raise errors.OpExecError("Failure during rpc call to node %s,"
4668 " result: %s" % (node, node_result))
4671 class IAllocator(object):
4672 """IAllocator framework.
4674 An IAllocator instance has three sets of attributes:
4675 - cfg/sstore that are needed to query the cluster
4676 - input data (all members of the _KEYS class attribute are required)
4677 - four buffer attributes (in|out_data|text), that represent the
4678 input (to the external script) in text and data structure format,
4679 and the output from it, again in two formats
4680 - the result variables from the script (success, info, nodes) for
4685 "mem_size", "disks", "disk_template",
4686 "os", "tags", "nics", "vcpus",
4692 def __init__(self, cfg, sstore, mode, name, **kwargs):
4694 self.sstore = sstore
4695 # init buffer variables
4696 self.in_text = self.out_text = self.in_data = self.out_data = None
4697 # init all input fields so that pylint is happy
4700 self.mem_size = self.disks = self.disk_template = None
4701 self.os = self.tags = self.nics = self.vcpus = None
4702 self.relocate_from = None
4704 self.required_nodes = None
4705 # init result fields
4706 self.success = self.info = self.nodes = None
4707 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4708 keyset = self._ALLO_KEYS
4709 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4710 keyset = self._RELO_KEYS
4712 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4713 " IAllocator" % self.mode)
4715 if key not in keyset:
4716 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4717 " IAllocator" % key)
4718 setattr(self, key, kwargs[key])
4720 if key not in kwargs:
4721 raise errors.ProgrammerError("Missing input parameter '%s' to"
4722 " IAllocator" % key)
4723 self._BuildInputData()
4725 def _ComputeClusterData(self):
4726 """Compute the generic allocator input data.
4728 This is the data that is independent of the actual operation.
4735 "cluster_name": self.sstore.GetClusterName(),
4736 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4737 "hypervisor_type": self.sstore.GetHypervisorType(),
4738 # we don't have job IDs
4741 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4745 node_list = cfg.GetNodeList()
4746 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4747 for nname in node_list:
4748 ninfo = cfg.GetNodeInfo(nname)
4749 if nname not in node_data or not isinstance(node_data[nname], dict):
4750 raise errors.OpExecError("Can't get data for node %s" % nname)
4751 remote_info = node_data[nname]
4752 for attr in ['memory_total', 'memory_free', 'memory_dom0',
4753 'vg_size', 'vg_free', 'cpu_total']:
4754 if attr not in remote_info:
4755 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4758 remote_info[attr] = int(remote_info[attr])
4759 except ValueError, err:
4760 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4761 " %s" % (nname, attr, str(err)))
4762 # compute memory used by primary instances
4763 i_p_mem = i_p_up_mem = 0
4764 for iinfo in i_list:
4765 if iinfo.primary_node == nname:
4766 i_p_mem += iinfo.memory
4767 if iinfo.status == "up":
4768 i_p_up_mem += iinfo.memory
4770 # compute memory used by instances
4772 "tags": list(ninfo.GetTags()),
4773 "total_memory": remote_info['memory_total'],
4774 "reserved_memory": remote_info['memory_dom0'],
4775 "free_memory": remote_info['memory_free'],
4776 "i_pri_memory": i_p_mem,
4777 "i_pri_up_memory": i_p_up_mem,
4778 "total_disk": remote_info['vg_size'],
4779 "free_disk": remote_info['vg_free'],
4780 "primary_ip": ninfo.primary_ip,
4781 "secondary_ip": ninfo.secondary_ip,
4782 "total_cpus": remote_info['cpu_total'],
4784 node_results[nname] = pnr
4785 data["nodes"] = node_results
4789 for iinfo in i_list:
4790 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4791 for n in iinfo.nics]
4793 "tags": list(iinfo.GetTags()),
4794 "should_run": iinfo.status == "up",
4795 "vcpus": iinfo.vcpus,
4796 "memory": iinfo.memory,
4798 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4800 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4801 "disk_template": iinfo.disk_template,
4803 instance_data[iinfo.name] = pir
4805 data["instances"] = instance_data
4809 def _AddNewInstance(self):
4810 """Add new instance data to allocator structure.
4812 This in combination with _AllocatorGetClusterData will create the
4813 correct structure needed as input for the allocator.
4815 The checks for the completeness of the opcode must have already been
4820 if len(self.disks) != 2:
4821 raise errors.OpExecError("Only two-disk configurations supported")
4823 disk_space = _ComputeDiskSize(self.disk_template,
4824 self.disks[0]["size"], self.disks[1]["size"])
4826 if self.disk_template in constants.DTS_NET_MIRROR:
4827 self.required_nodes = 2
4829 self.required_nodes = 1
4833 "disk_template": self.disk_template,
4836 "vcpus": self.vcpus,
4837 "memory": self.mem_size,
4838 "disks": self.disks,
4839 "disk_space_total": disk_space,
4841 "required_nodes": self.required_nodes,
4843 data["request"] = request
4845 def _AddRelocateInstance(self):
4846 """Add relocate instance data to allocator structure.
4848 This in combination with _IAllocatorGetClusterData will create the
4849 correct structure needed as input for the allocator.
4851 The checks for the completeness of the opcode must have already been
4855 instance = self.cfg.GetInstanceInfo(self.name)
4856 if instance is None:
4857 raise errors.ProgrammerError("Unknown instance '%s' passed to"
4858 " IAllocator" % self.name)
4860 if instance.disk_template not in constants.DTS_NET_MIRROR:
4861 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4863 if len(instance.secondary_nodes) != 1:
4864 raise errors.OpPrereqError("Instance has not exactly one secondary node")
4866 self.required_nodes = 1
4868 disk_space = _ComputeDiskSize(instance.disk_template,
4869 instance.disks[0].size,
4870 instance.disks[1].size)
4875 "disk_space_total": disk_space,
4876 "required_nodes": self.required_nodes,
4877 "relocate_from": self.relocate_from,
4879 self.in_data["request"] = request
4881 def _BuildInputData(self):
4882 """Build input data structures.
4885 self._ComputeClusterData()
4887 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4888 self._AddNewInstance()
4890 self._AddRelocateInstance()
4892 self.in_text = serializer.Dump(self.in_data)
4894 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4895 """Run an instance allocator and return the results.
4900 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4902 if not isinstance(result, tuple) or len(result) != 4:
4903 raise errors.OpExecError("Invalid result from master iallocator runner")
4905 rcode, stdout, stderr, fail = result
4907 if rcode == constants.IARUN_NOTFOUND:
4908 raise errors.OpExecError("Can't find allocator '%s'" % name)
4909 elif rcode == constants.IARUN_FAILURE:
4910 raise errors.OpExecError("Instance allocator call failed: %s,"
4912 (fail, stdout+stderr))
4913 self.out_text = stdout
4915 self._ValidateResult()
4917 def _ValidateResult(self):
4918 """Process the allocator results.
4920 This will process and if successful save the result in
4921 self.out_data and the other parameters.
4925 rdict = serializer.Load(self.out_text)
4926 except Exception, err:
4927 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4929 if not isinstance(rdict, dict):
4930 raise errors.OpExecError("Can't parse iallocator results: not a dict")
4932 for key in "success", "info", "nodes":
4933 if key not in rdict:
4934 raise errors.OpExecError("Can't parse iallocator results:"
4935 " missing key '%s'" % key)
4936 setattr(self, key, rdict[key])
4938 if not isinstance(rdict["nodes"], list):
4939 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4941 self.out_data = rdict
4944 class LUTestAllocator(NoHooksLU):
4945 """Run allocator tests.
4947 This LU runs the allocator tests
4950 _OP_REQP = ["direction", "mode", "name"]
4952 def CheckPrereq(self):
4953 """Check prerequisites.
4955 This checks the opcode parameters depending on the director and mode test.
4958 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4959 for attr in ["name", "mem_size", "disks", "disk_template",
4960 "os", "tags", "nics", "vcpus"]:
4961 if not hasattr(self.op, attr):
4962 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4964 iname = self.cfg.ExpandInstanceName(self.op.name)
4965 if iname is not None:
4966 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4968 if not isinstance(self.op.nics, list):
4969 raise errors.OpPrereqError("Invalid parameter 'nics'")
4970 for row in self.op.nics:
4971 if (not isinstance(row, dict) or
4974 "bridge" not in row):
4975 raise errors.OpPrereqError("Invalid contents of the"
4976 " 'nics' parameter")
4977 if not isinstance(self.op.disks, list):
4978 raise errors.OpPrereqError("Invalid parameter 'disks'")
4979 if len(self.op.disks) != 2:
4980 raise errors.OpPrereqError("Only two-disk configurations supported")
4981 for row in self.op.disks:
4982 if (not isinstance(row, dict) or
4983 "size" not in row or
4984 not isinstance(row["size"], int) or
4985 "mode" not in row or
4986 row["mode"] not in ['r', 'w']):
4987 raise errors.OpPrereqError("Invalid contents of the"
4988 " 'disks' parameter")
4989 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
4990 if not hasattr(self.op, "name"):
4991 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
4992 fname = self.cfg.ExpandInstanceName(self.op.name)
4994 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
4996 self.op.name = fname
4997 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
4999 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5002 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5003 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5004 raise errors.OpPrereqError("Missing allocator name")
5005 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5006 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5009 def Exec(self, feedback_fn):
5010 """Run the allocator test.
5013 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5014 ial = IAllocator(self.cfg, self.sstore,
5017 mem_size=self.op.mem_size,
5018 disks=self.op.disks,
5019 disk_template=self.op.disk_template,
5023 vcpus=self.op.vcpus,
5026 ial = IAllocator(self.cfg, self.sstore,
5029 relocate_from=list(self.relocate_from),
5032 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5033 result = ial.in_text
5035 ial.Run(self.op.allocator, validate=False)
5036 result = ial.out_text