4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the master-side code."""
24 # pylint: disable-msg=W0613,W0201
34 from ganeti import rpc
35 from ganeti import ssh
36 from ganeti import logger
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import config
42 from ganeti import constants
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import ssconf
46 from ganeti import serializer
49 class LogicalUnit(object):
50 """Logical Unit base class.
52 Subclasses must follow these rules:
53 - implement ExpandNames
54 - implement CheckPrereq
56 - implement BuildHooksEnv
57 - redefine HPATH and HTYPE
58 - optionally redefine their run requirements:
59 REQ_MASTER: the LU needs to run on the master node
60 REQ_WSSTORE: the LU needs a writable SimpleStore
61 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
63 Note that all commands require root permissions.
73 def __init__(self, processor, op, context, sstore):
74 """Constructor for LogicalUnit.
76 This needs to be overriden in derived classes in order to check op
82 self.cfg = context.cfg
84 self.context = context
85 self.needed_locks = None
86 self.share_locks = dict(((i, 0) for i in locking.LEVELS))
87 # Used to force good behavior when calling helper functions
88 self.recalculate_locks = {}
91 for attr_name in self._OP_REQP:
92 attr_val = getattr(op, attr_name, None)
94 raise errors.OpPrereqError("Required parameter '%s' missing" %
97 if not self.cfg.IsCluster():
98 raise errors.OpPrereqError("Cluster not initialized yet,"
99 " use 'gnt-cluster init' first.")
101 master = sstore.GetMasterNode()
102 if master != utils.HostInfo().name:
103 raise errors.OpPrereqError("Commands must be run on the master"
107 """Returns the SshRunner object
111 self.__ssh = ssh.SshRunner(self.sstore)
114 ssh = property(fget=__GetSSH)
116 def ExpandNames(self):
117 """Expand names for this LU.
119 This method is called before starting to execute the opcode, and it should
120 update all the parameters of the opcode to their canonical form (e.g. a
121 short node name must be fully expanded after this method has successfully
122 completed). This way locking, hooks, logging, ecc. can work correctly.
124 LUs which implement this method must also populate the self.needed_locks
125 member, as a dict with lock levels as keys, and a list of needed lock names
127 - Use an empty dict if you don't need any lock
128 - If you don't need any lock at a particular level omit that level
129 - Don't put anything for the BGL level
130 - If you want all locks at a level use None as a value
131 (this reflects what LockSet does, and will be replaced before
132 CheckPrereq with the full list of nodes that have been locked)
134 If you need to share locks (rather than acquire them exclusively) at one
135 level you can modify self.share_locks, setting a true value (usually 1) for
136 that level. By default locks are not shared.
139 # Acquire all nodes and one instance
140 self.needed_locks = {
141 locking.LEVEL_NODE: None,
142 locking.LEVEL_INSTANCES: ['instance1.example.tld'],
144 # Acquire just two nodes
145 self.needed_locks = {
146 locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
149 self.needed_locks = {} # No, you can't leave it to the default value None
152 # The implementation of this method is mandatory only if the new LU is
153 # concurrent, so that old LUs don't need to be changed all at the same
156 self.needed_locks = {} # Exclusive LUs don't need locks.
158 raise NotImplementedError
160 def DeclareLocks(self, level):
161 """Declare LU locking needs for a level
163 While most LUs can just declare their locking needs at ExpandNames time,
164 sometimes there's the need to calculate some locks after having acquired
165 the ones before. This function is called just before acquiring locks at a
166 particular level, but after acquiring the ones at lower levels, and permits
167 such calculations. It can be used to modify self.needed_locks, and by
168 default it does nothing.
170 This function is only called if you have something already set in
171 self.needed_locks for the level.
173 @param level: Locking level which is going to be locked
174 @type level: member of ganeti.locking.LEVELS
178 def CheckPrereq(self):
179 """Check prerequisites for this LU.
181 This method should check that the prerequisites for the execution
182 of this LU are fulfilled. It can do internode communication, but
183 it should be idempotent - no cluster or system changes are
186 The method should raise errors.OpPrereqError in case something is
187 not fulfilled. Its return value is ignored.
189 This method should also update all the parameters of the opcode to
190 their canonical form if it hasn't been done by ExpandNames before.
193 raise NotImplementedError
195 def Exec(self, feedback_fn):
198 This method should implement the actual work. It should raise
199 errors.OpExecError for failures that are somewhat dealt with in
203 raise NotImplementedError
205 def BuildHooksEnv(self):
206 """Build hooks environment for this LU.
208 This method should return a three-node tuple consisting of: a dict
209 containing the environment that will be used for running the
210 specific hook for this LU, a list of node names on which the hook
211 should run before the execution, and a list of node names on which
212 the hook should run after the execution.
214 The keys of the dict must not have 'GANETI_' prefixed as this will
215 be handled in the hooks runner. Also note additional keys will be
216 added by the hooks runner. If the LU doesn't define any
217 environment, an empty dict (and not None) should be returned.
219 No nodes should be returned as an empty list (and not None).
221 Note that if the HPATH for a LU class is None, this function will
225 raise NotImplementedError
227 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
228 """Notify the LU about the results of its hooks.
230 This method is called every time a hooks phase is executed, and notifies
231 the Logical Unit about the hooks' result. The LU can then use it to alter
232 its result based on the hooks. By default the method does nothing and the
233 previous result is passed back unchanged but any LU can define it if it
234 wants to use the local cluster hook-scripts somehow.
237 phase: the hooks phase that has just been run
238 hooks_results: the results of the multi-node hooks rpc call
239 feedback_fn: function to send feedback back to the caller
240 lu_result: the previous result this LU had, or None in the PRE phase.
245 def _ExpandAndLockInstance(self):
246 """Helper function to expand and lock an instance.
248 Many LUs that work on an instance take its name in self.op.instance_name
249 and need to expand it and then declare the expanded name for locking. This
250 function does it, and then updates self.op.instance_name to the expanded
251 name. It also initializes needed_locks as a dict, if this hasn't been done
255 if self.needed_locks is None:
256 self.needed_locks = {}
258 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
259 "_ExpandAndLockInstance called with instance-level locks set"
260 expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
261 if expanded_name is None:
262 raise errors.OpPrereqError("Instance '%s' not known" %
263 self.op.instance_name)
264 self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
265 self.op.instance_name = expanded_name
267 def _LockInstancesNodes(self):
268 """Helper function to declare instances' nodes for locking.
270 This function should be called after locking one or more instances to lock
271 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
272 with all primary or secondary nodes for instances already locked and
273 present in self.needed_locks[locking.LEVEL_INSTANCE].
275 It should be called from DeclareLocks, and for safety only works if
276 self.recalculate_locks[locking.LEVEL_NODE] is set.
278 In the future it may grow parameters to just lock some instance's nodes, or
279 to just lock primaries or secondary nodes, if needed.
281 If should be called in DeclareLocks in a way similar to:
283 if level == locking.LEVEL_NODE:
284 self._LockInstancesNodes()
287 assert locking.LEVEL_NODE in self.recalculate_locks, \
288 "_LockInstancesNodes helper function called with no nodes to recalculate"
290 # TODO: check if we're really been called with the instance locks held
292 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
293 # future we might want to have different behaviors depending on the value
294 # of self.recalculate_locks[locking.LEVEL_NODE]
296 for instance_name in self.needed_locks[locking.LEVEL_INSTANCE]:
297 instance = self.context.cfg.GetInstanceInfo(instance_name)
298 wanted_nodes.append(instance.primary_node)
299 wanted_nodes.extend(instance.secondary_nodes)
300 self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
302 del self.recalculate_locks[locking.LEVEL_NODE]
305 class NoHooksLU(LogicalUnit):
306 """Simple LU which runs no hooks.
308 This LU is intended as a parent for other LogicalUnits which will
309 run no hooks, in order to reduce duplicate code.
316 def _GetWantedNodes(lu, nodes):
317 """Returns list of checked and expanded node names.
320 nodes: List of nodes (strings) or None for all
323 if not isinstance(nodes, list):
324 raise errors.OpPrereqError("Invalid argument type 'nodes'")
330 node = lu.cfg.ExpandNodeName(name)
332 raise errors.OpPrereqError("No such node name '%s'" % name)
336 wanted = lu.cfg.GetNodeList()
337 return utils.NiceSort(wanted)
340 def _GetWantedInstances(lu, instances):
341 """Returns list of checked and expanded instance names.
344 instances: List of instances (strings) or None for all
347 if not isinstance(instances, list):
348 raise errors.OpPrereqError("Invalid argument type 'instances'")
353 for name in instances:
354 instance = lu.cfg.ExpandInstanceName(name)
356 raise errors.OpPrereqError("No such instance name '%s'" % name)
357 wanted.append(instance)
360 wanted = lu.cfg.GetInstanceList()
361 return utils.NiceSort(wanted)
364 def _CheckOutputFields(static, dynamic, selected):
365 """Checks whether all selected fields are valid.
368 static: Static fields
369 dynamic: Dynamic fields
372 static_fields = frozenset(static)
373 dynamic_fields = frozenset(dynamic)
375 all_fields = static_fields | dynamic_fields
377 if not all_fields.issuperset(selected):
378 raise errors.OpPrereqError("Unknown output fields selected: %s"
379 % ",".join(frozenset(selected).
380 difference(all_fields)))
383 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
384 memory, vcpus, nics):
385 """Builds instance related env variables for hooks from single variables.
388 secondary_nodes: List of secondary nodes as strings
392 "INSTANCE_NAME": name,
393 "INSTANCE_PRIMARY": primary_node,
394 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
395 "INSTANCE_OS_TYPE": os_type,
396 "INSTANCE_STATUS": status,
397 "INSTANCE_MEMORY": memory,
398 "INSTANCE_VCPUS": vcpus,
402 nic_count = len(nics)
403 for idx, (ip, bridge, mac) in enumerate(nics):
406 env["INSTANCE_NIC%d_IP" % idx] = ip
407 env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
408 env["INSTANCE_NIC%d_HWADDR" % idx] = mac
412 env["INSTANCE_NIC_COUNT"] = nic_count
417 def _BuildInstanceHookEnvByObject(instance, override=None):
418 """Builds instance related env variables for hooks from an object.
421 instance: objects.Instance object of instance
422 override: dict of values to override
425 'name': instance.name,
426 'primary_node': instance.primary_node,
427 'secondary_nodes': instance.secondary_nodes,
428 'os_type': instance.os,
429 'status': instance.os,
430 'memory': instance.memory,
431 'vcpus': instance.vcpus,
432 'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
435 args.update(override)
436 return _BuildInstanceHookEnv(**args)
439 def _CheckInstanceBridgesExist(instance):
440 """Check that the brigdes needed by an instance exist.
443 # check bridges existance
444 brlist = [nic.bridge for nic in instance.nics]
445 if not rpc.call_bridges_exist(instance.primary_node, brlist):
446 raise errors.OpPrereqError("one or more target bridges %s does not"
447 " exist on destination node '%s'" %
448 (brlist, instance.primary_node))
451 class LUDestroyCluster(NoHooksLU):
452 """Logical unit for destroying the cluster.
457 def CheckPrereq(self):
458 """Check prerequisites.
460 This checks whether the cluster is empty.
462 Any errors are signalled by raising errors.OpPrereqError.
465 master = self.sstore.GetMasterNode()
467 nodelist = self.cfg.GetNodeList()
468 if len(nodelist) != 1 or nodelist[0] != master:
469 raise errors.OpPrereqError("There are still %d node(s) in"
470 " this cluster." % (len(nodelist) - 1))
471 instancelist = self.cfg.GetInstanceList()
473 raise errors.OpPrereqError("There are still %d instance(s) in"
474 " this cluster." % len(instancelist))
476 def Exec(self, feedback_fn):
477 """Destroys the cluster.
480 master = self.sstore.GetMasterNode()
481 if not rpc.call_node_stop_master(master, False):
482 raise errors.OpExecError("Could not disable the master role")
483 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
484 utils.CreateBackup(priv_key)
485 utils.CreateBackup(pub_key)
486 rpc.call_node_leave_cluster(master)
489 class LUVerifyCluster(LogicalUnit):
490 """Verifies the cluster status.
493 HPATH = "cluster-verify"
494 HTYPE = constants.HTYPE_CLUSTER
495 _OP_REQP = ["skip_checks"]
497 def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
498 remote_version, feedback_fn):
499 """Run multiple tests against a node.
502 - compares ganeti version
503 - checks vg existance and size > 20G
504 - checks config file checksum
505 - checks ssh to other nodes
508 node: name of the node to check
509 file_list: required list of files
510 local_cksum: dictionary of local files and their checksums
513 # compares ganeti version
514 local_version = constants.PROTOCOL_VERSION
515 if not remote_version:
516 feedback_fn(" - ERROR: connection to %s failed" % (node))
519 if local_version != remote_version:
520 feedback_fn(" - ERROR: sw version mismatch: master %s, node(%s) %s" %
521 (local_version, node, remote_version))
524 # checks vg existance and size > 20G
528 feedback_fn(" - ERROR: unable to check volume groups on node %s." %
532 vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
533 constants.MIN_VG_SIZE)
535 feedback_fn(" - ERROR: %s on node %s" % (vgstatus, node))
538 # checks config file checksum
541 if 'filelist' not in node_result:
543 feedback_fn(" - ERROR: node hasn't returned file checksum data")
545 remote_cksum = node_result['filelist']
546 for file_name in file_list:
547 if file_name not in remote_cksum:
549 feedback_fn(" - ERROR: file '%s' missing" % file_name)
550 elif remote_cksum[file_name] != local_cksum[file_name]:
552 feedback_fn(" - ERROR: file '%s' has wrong checksum" % file_name)
554 if 'nodelist' not in node_result:
556 feedback_fn(" - ERROR: node hasn't returned node ssh connectivity data")
558 if node_result['nodelist']:
560 for node in node_result['nodelist']:
561 feedback_fn(" - ERROR: ssh communication with node '%s': %s" %
562 (node, node_result['nodelist'][node]))
563 if 'node-net-test' not in node_result:
565 feedback_fn(" - ERROR: node hasn't returned node tcp connectivity data")
567 if node_result['node-net-test']:
569 nlist = utils.NiceSort(node_result['node-net-test'].keys())
571 feedback_fn(" - ERROR: tcp communication with node '%s': %s" %
572 (node, node_result['node-net-test'][node]))
574 hyp_result = node_result.get('hypervisor', None)
575 if hyp_result is not None:
576 feedback_fn(" - ERROR: hypervisor verify failure: '%s'" % hyp_result)
579 def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
580 node_instance, feedback_fn):
581 """Verify an instance.
583 This function checks to see if the required block devices are
584 available on the instance's node.
589 node_current = instanceconfig.primary_node
592 instanceconfig.MapLVsByNode(node_vol_should)
594 for node in node_vol_should:
595 for volume in node_vol_should[node]:
596 if node not in node_vol_is or volume not in node_vol_is[node]:
597 feedback_fn(" - ERROR: volume %s missing on node %s" %
601 if not instanceconfig.status == 'down':
602 if (node_current not in node_instance or
603 not instance in node_instance[node_current]):
604 feedback_fn(" - ERROR: instance %s not running on node %s" %
605 (instance, node_current))
608 for node in node_instance:
609 if (not node == node_current):
610 if instance in node_instance[node]:
611 feedback_fn(" - ERROR: instance %s should not run on node %s" %
617 def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
618 """Verify if there are any unknown volumes in the cluster.
620 The .os, .swap and backup volumes are ignored. All other volumes are
626 for node in node_vol_is:
627 for volume in node_vol_is[node]:
628 if node not in node_vol_should or volume not in node_vol_should[node]:
629 feedback_fn(" - ERROR: volume %s on node %s should not exist" %
634 def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
635 """Verify the list of running instances.
637 This checks what instances are running but unknown to the cluster.
641 for node in node_instance:
642 for runninginstance in node_instance[node]:
643 if runninginstance not in instancelist:
644 feedback_fn(" - ERROR: instance %s on node %s should not exist" %
645 (runninginstance, node))
649 def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
650 """Verify N+1 Memory Resilience.
652 Check that if one single node dies we can still start all the instances it
658 for node, nodeinfo in node_info.iteritems():
659 # This code checks that every node which is now listed as secondary has
660 # enough memory to host all instances it is supposed to should a single
661 # other node in the cluster fail.
662 # FIXME: not ready for failover to an arbitrary node
663 # FIXME: does not support file-backed instances
664 # WARNING: we currently take into account down instances as well as up
665 # ones, considering that even if they're down someone might want to start
666 # them even in the event of a node failure.
667 for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
669 for instance in instances:
670 needed_mem += instance_cfg[instance].memory
671 if nodeinfo['mfree'] < needed_mem:
672 feedback_fn(" - ERROR: not enough memory on node %s to accomodate"
673 " failovers should node %s fail" % (node, prinode))
677 def CheckPrereq(self):
678 """Check prerequisites.
680 Transform the list of checks we're going to skip into a set and check that
681 all its members are valid.
684 self.skip_set = frozenset(self.op.skip_checks)
685 if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
686 raise errors.OpPrereqError("Invalid checks to be skipped specified")
688 def BuildHooksEnv(self):
691 Cluster-Verify hooks just rone in the post phase and their failure makes
692 the output be logged in the verify output and the verification to fail.
695 all_nodes = self.cfg.GetNodeList()
696 # TODO: populate the environment with useful information for verify hooks
698 return env, [], all_nodes
700 def Exec(self, feedback_fn):
701 """Verify integrity of cluster, performing various test on nodes.
705 feedback_fn("* Verifying global settings")
706 for msg in self.cfg.VerifyConfig():
707 feedback_fn(" - ERROR: %s" % msg)
709 vg_name = self.cfg.GetVGName()
710 nodelist = utils.NiceSort(self.cfg.GetNodeList())
711 nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
712 instancelist = utils.NiceSort(self.cfg.GetInstanceList())
713 i_non_redundant = [] # Non redundant instances
719 # FIXME: verify OS list
721 file_names = list(self.sstore.GetFileList())
722 file_names.append(constants.SSL_CERT_FILE)
723 file_names.append(constants.CLUSTER_CONF_FILE)
724 local_checksums = utils.FingerprintFiles(file_names)
726 feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
727 all_volumeinfo = rpc.call_volume_list(nodelist, vg_name)
728 all_instanceinfo = rpc.call_instance_list(nodelist)
729 all_vglist = rpc.call_vg_list(nodelist)
730 node_verify_param = {
731 'filelist': file_names,
732 'nodelist': nodelist,
734 'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
735 for node in nodeinfo]
737 all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
738 all_rversion = rpc.call_version(nodelist)
739 all_ninfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
741 for node in nodelist:
742 feedback_fn("* Verifying node %s" % node)
743 result = self._VerifyNode(node, file_names, local_checksums,
744 all_vglist[node], all_nvinfo[node],
745 all_rversion[node], feedback_fn)
749 volumeinfo = all_volumeinfo[node]
751 if isinstance(volumeinfo, basestring):
752 feedback_fn(" - ERROR: LVM problem on node %s: %s" %
753 (node, volumeinfo[-400:].encode('string_escape')))
755 node_volume[node] = {}
756 elif not isinstance(volumeinfo, dict):
757 feedback_fn(" - ERROR: connection to %s failed" % (node,))
761 node_volume[node] = volumeinfo
764 nodeinstance = all_instanceinfo[node]
765 if type(nodeinstance) != list:
766 feedback_fn(" - ERROR: connection to %s failed" % (node,))
770 node_instance[node] = nodeinstance
773 nodeinfo = all_ninfo[node]
774 if not isinstance(nodeinfo, dict):
775 feedback_fn(" - ERROR: connection to %s failed" % (node,))
781 "mfree": int(nodeinfo['memory_free']),
782 "dfree": int(nodeinfo['vg_free']),
785 # dictionary holding all instances this node is secondary for,
786 # grouped by their primary node. Each key is a cluster node, and each
787 # value is a list of instances which have the key as primary and the
788 # current node as secondary. this is handy to calculate N+1 memory
789 # availability if you can only failover from a primary to its
791 "sinst-by-pnode": {},
794 feedback_fn(" - ERROR: invalid value returned from node %s" % (node,))
800 for instance in instancelist:
801 feedback_fn("* Verifying instance %s" % instance)
802 inst_config = self.cfg.GetInstanceInfo(instance)
803 result = self._VerifyInstance(instance, inst_config, node_volume,
804 node_instance, feedback_fn)
807 inst_config.MapLVsByNode(node_vol_should)
809 instance_cfg[instance] = inst_config
811 pnode = inst_config.primary_node
812 if pnode in node_info:
813 node_info[pnode]['pinst'].append(instance)
815 feedback_fn(" - ERROR: instance %s, connection to primary node"
816 " %s failed" % (instance, pnode))
819 # If the instance is non-redundant we cannot survive losing its primary
820 # node, so we are not N+1 compliant. On the other hand we have no disk
821 # templates with more than one secondary so that situation is not well
823 # FIXME: does not support file-backed instances
824 if len(inst_config.secondary_nodes) == 0:
825 i_non_redundant.append(instance)
826 elif len(inst_config.secondary_nodes) > 1:
827 feedback_fn(" - WARNING: multiple secondaries for instance %s"
830 for snode in inst_config.secondary_nodes:
831 if snode in node_info:
832 node_info[snode]['sinst'].append(instance)
833 if pnode not in node_info[snode]['sinst-by-pnode']:
834 node_info[snode]['sinst-by-pnode'][pnode] = []
835 node_info[snode]['sinst-by-pnode'][pnode].append(instance)
837 feedback_fn(" - ERROR: instance %s, connection to secondary node"
838 " %s failed" % (instance, snode))
840 feedback_fn("* Verifying orphan volumes")
841 result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
845 feedback_fn("* Verifying remaining instances")
846 result = self._VerifyOrphanInstances(instancelist, node_instance,
850 if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
851 feedback_fn("* Verifying N+1 Memory redundancy")
852 result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
855 feedback_fn("* Other Notes")
857 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
858 % len(i_non_redundant))
862 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
863 """Analize the post-hooks' result, handle it, and send some
864 nicely-formatted feedback back to the user.
867 phase: the hooks phase that has just been run
868 hooks_results: the results of the multi-node hooks rpc call
869 feedback_fn: function to send feedback back to the caller
870 lu_result: previous Exec result
873 # We only really run POST phase hooks, and are only interested in their results
874 if phase == constants.HOOKS_PHASE_POST:
875 # Used to change hooks' output to proper indentation
876 indent_re = re.compile('^', re.M)
877 feedback_fn("* Hooks Results")
878 if not hooks_results:
879 feedback_fn(" - ERROR: general communication failure")
882 for node_name in hooks_results:
883 show_node_header = True
884 res = hooks_results[node_name]
885 if res is False or not isinstance(res, list):
886 feedback_fn(" Communication failure")
889 for script, hkr, output in res:
890 if hkr == constants.HKR_FAIL:
891 # The node header is only shown once, if there are
892 # failing hooks on that node
894 feedback_fn(" Node %s:" % node_name)
895 show_node_header = False
896 feedback_fn(" ERROR: Script %s failed, output:" % script)
897 output = indent_re.sub(' ', output)
898 feedback_fn("%s" % output)
904 class LUVerifyDisks(NoHooksLU):
905 """Verifies the cluster disks status.
910 def CheckPrereq(self):
911 """Check prerequisites.
913 This has no prerequisites.
918 def Exec(self, feedback_fn):
919 """Verify integrity of cluster disks.
922 result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
924 vg_name = self.cfg.GetVGName()
925 nodes = utils.NiceSort(self.cfg.GetNodeList())
926 instances = [self.cfg.GetInstanceInfo(name)
927 for name in self.cfg.GetInstanceList()]
930 for inst in instances:
932 if (inst.status != "up" or
933 inst.disk_template not in constants.DTS_NET_MIRROR):
935 inst.MapLVsByNode(inst_lvs)
936 # transform { iname: {node: [vol,],},} to {(node, vol): iname}
937 for node, vol_list in inst_lvs.iteritems():
939 nv_dict[(node, vol)] = inst
944 node_lvs = rpc.call_volume_list(nodes, vg_name)
951 if isinstance(lvs, basestring):
952 logger.Info("error enumerating LVs on node %s: %s" % (node, lvs))
954 elif not isinstance(lvs, dict):
955 logger.Info("connection to node %s failed or invalid data returned" %
957 res_nodes.append(node)
960 for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
961 inst = nv_dict.pop((node, lv_name), None)
962 if (not lv_online and inst is not None
963 and inst.name not in res_instances):
964 res_instances.append(inst.name)
966 # any leftover items in nv_dict are missing LVs, let's arrange the
968 for key, inst in nv_dict.iteritems():
969 if inst.name not in res_missing:
970 res_missing[inst.name] = []
971 res_missing[inst.name].append(key)
976 class LURenameCluster(LogicalUnit):
977 """Rename the cluster.
980 HPATH = "cluster-rename"
981 HTYPE = constants.HTYPE_CLUSTER
985 def BuildHooksEnv(self):
990 "OP_TARGET": self.sstore.GetClusterName(),
991 "NEW_NAME": self.op.name,
993 mn = self.sstore.GetMasterNode()
994 return env, [mn], [mn]
996 def CheckPrereq(self):
997 """Verify that the passed name is a valid one.
1000 hostname = utils.HostInfo(self.op.name)
1002 new_name = hostname.name
1003 self.ip = new_ip = hostname.ip
1004 old_name = self.sstore.GetClusterName()
1005 old_ip = self.sstore.GetMasterIP()
1006 if new_name == old_name and new_ip == old_ip:
1007 raise errors.OpPrereqError("Neither the name nor the IP address of the"
1008 " cluster has changed")
1009 if new_ip != old_ip:
1010 if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1011 raise errors.OpPrereqError("The given cluster IP address (%s) is"
1012 " reachable on the network. Aborting." %
1015 self.op.name = new_name
1017 def Exec(self, feedback_fn):
1018 """Rename the cluster.
1021 clustername = self.op.name
1025 # shutdown the master IP
1026 master = ss.GetMasterNode()
1027 if not rpc.call_node_stop_master(master, False):
1028 raise errors.OpExecError("Could not disable the master role")
1032 ss.SetKey(ss.SS_MASTER_IP, ip)
1033 ss.SetKey(ss.SS_CLUSTER_NAME, clustername)
1035 # Distribute updated ss config to all nodes
1036 myself = self.cfg.GetNodeInfo(master)
1037 dist_nodes = self.cfg.GetNodeList()
1038 if myself.name in dist_nodes:
1039 dist_nodes.remove(myself.name)
1041 logger.Debug("Copying updated ssconf data to all nodes")
1042 for keyname in [ss.SS_CLUSTER_NAME, ss.SS_MASTER_IP]:
1043 fname = ss.KeyToFilename(keyname)
1044 result = rpc.call_upload_file(dist_nodes, fname)
1045 for to_node in dist_nodes:
1046 if not result[to_node]:
1047 logger.Error("copy of file %s to node %s failed" %
1050 if not rpc.call_node_start_master(master, False):
1051 logger.Error("Could not re-enable the master role on the master,"
1052 " please restart manually.")
1055 def _RecursiveCheckIfLVMBased(disk):
1056 """Check if the given disk or its children are lvm-based.
1059 disk: ganeti.objects.Disk object
1062 boolean indicating whether a LD_LV dev_type was found or not
1066 for chdisk in disk.children:
1067 if _RecursiveCheckIfLVMBased(chdisk):
1069 return disk.dev_type == constants.LD_LV
1072 class LUSetClusterParams(LogicalUnit):
1073 """Change the parameters of the cluster.
1076 HPATH = "cluster-modify"
1077 HTYPE = constants.HTYPE_CLUSTER
1080 def BuildHooksEnv(self):
1085 "OP_TARGET": self.sstore.GetClusterName(),
1086 "NEW_VG_NAME": self.op.vg_name,
1088 mn = self.sstore.GetMasterNode()
1089 return env, [mn], [mn]
1091 def CheckPrereq(self):
1092 """Check prerequisites.
1094 This checks whether the given params don't conflict and
1095 if the given volume group is valid.
1098 if not self.op.vg_name:
1099 instances = [self.cfg.GetInstanceInfo(name)
1100 for name in self.cfg.GetInstanceList()]
1101 for inst in instances:
1102 for disk in inst.disks:
1103 if _RecursiveCheckIfLVMBased(disk):
1104 raise errors.OpPrereqError("Cannot disable lvm storage while"
1105 " lvm-based instances exist")
1107 # if vg_name not None, checks given volume group on all nodes
1109 node_list = self.cfg.GetNodeList()
1110 vglist = rpc.call_vg_list(node_list)
1111 for node in node_list:
1112 vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
1113 constants.MIN_VG_SIZE)
1115 raise errors.OpPrereqError("Error on node '%s': %s" %
1118 def Exec(self, feedback_fn):
1119 """Change the parameters of the cluster.
1122 if self.op.vg_name != self.cfg.GetVGName():
1123 self.cfg.SetVGName(self.op.vg_name)
1125 feedback_fn("Cluster LVM configuration already in desired"
1126 " state, not changing")
1129 def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
1130 """Sleep and poll for an instance's disk to sync.
1133 if not instance.disks:
1137 proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1139 node = instance.primary_node
1141 for dev in instance.disks:
1142 cfgw.SetDiskID(dev, node)
1148 cumul_degraded = False
1149 rstats = rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1151 proc.LogWarning("Can't get any data from node %s" % node)
1154 raise errors.RemoteError("Can't contact node %s for mirror data,"
1155 " aborting." % node)
1159 for i in range(len(rstats)):
1162 proc.LogWarning("Can't compute data for node %s/%s" %
1163 (node, instance.disks[i].iv_name))
1165 # we ignore the ldisk parameter
1166 perc_done, est_time, is_degraded, _ = mstat
1167 cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1168 if perc_done is not None:
1170 if est_time is not None:
1171 rem_time = "%d estimated seconds remaining" % est_time
1174 rem_time = "no time estimate"
1175 proc.LogInfo("- device %s: %5.2f%% done, %s" %
1176 (instance.disks[i].iv_name, perc_done, rem_time))
1180 time.sleep(min(60, max_time))
1183 proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1184 return not cumul_degraded
1187 def _CheckDiskConsistency(cfgw, dev, node, on_primary, ldisk=False):
1188 """Check that mirrors are not degraded.
1190 The ldisk parameter, if True, will change the test from the
1191 is_degraded attribute (which represents overall non-ok status for
1192 the device(s)) to the ldisk (representing the local storage status).
1195 cfgw.SetDiskID(dev, node)
1202 if on_primary or dev.AssembleOnSecondary():
1203 rstats = rpc.call_blockdev_find(node, dev)
1205 logger.ToStderr("Node %s: Disk degraded, not found or node down" % node)
1208 result = result and (not rstats[idx])
1210 for child in dev.children:
1211 result = result and _CheckDiskConsistency(cfgw, child, node, on_primary)
1216 class LUDiagnoseOS(NoHooksLU):
1217 """Logical unit for OS diagnose/query.
1220 _OP_REQP = ["output_fields", "names"]
1222 def CheckPrereq(self):
1223 """Check prerequisites.
1225 This always succeeds, since this is a pure query LU.
1229 raise errors.OpPrereqError("Selective OS query not supported")
1231 self.dynamic_fields = frozenset(["name", "valid", "node_status"])
1232 _CheckOutputFields(static=[],
1233 dynamic=self.dynamic_fields,
1234 selected=self.op.output_fields)
1237 def _DiagnoseByOS(node_list, rlist):
1238 """Remaps a per-node return list into an a per-os per-node dictionary
1241 node_list: a list with the names of all nodes
1242 rlist: a map with node names as keys and OS objects as values
1245 map: a map with osnames as keys and as value another map, with
1247 keys and list of OS objects as values
1248 e.g. {"debian-etch": {"node1": [<object>,...],
1249 "node2": [<object>,]}
1254 for node_name, nr in rlist.iteritems():
1258 if os_obj.name not in all_os:
1259 # build a list of nodes for this os containing empty lists
1260 # for each node in node_list
1261 all_os[os_obj.name] = {}
1262 for nname in node_list:
1263 all_os[os_obj.name][nname] = []
1264 all_os[os_obj.name][node_name].append(os_obj)
1267 def Exec(self, feedback_fn):
1268 """Compute the list of OSes.
1271 node_list = self.cfg.GetNodeList()
1272 node_data = rpc.call_os_diagnose(node_list)
1273 if node_data == False:
1274 raise errors.OpExecError("Can't gather the list of OSes")
1275 pol = self._DiagnoseByOS(node_list, node_data)
1277 for os_name, os_data in pol.iteritems():
1279 for field in self.op.output_fields:
1282 elif field == "valid":
1283 val = utils.all([osl and osl[0] for osl in os_data.values()])
1284 elif field == "node_status":
1286 for node_name, nos_list in os_data.iteritems():
1287 val[node_name] = [(v.status, v.path) for v in nos_list]
1289 raise errors.ParameterError(field)
1296 class LURemoveNode(LogicalUnit):
1297 """Logical unit for removing a node.
1300 HPATH = "node-remove"
1301 HTYPE = constants.HTYPE_NODE
1302 _OP_REQP = ["node_name"]
1304 def BuildHooksEnv(self):
1307 This doesn't run on the target node in the pre phase as a failed
1308 node would then be impossible to remove.
1312 "OP_TARGET": self.op.node_name,
1313 "NODE_NAME": self.op.node_name,
1315 all_nodes = self.cfg.GetNodeList()
1316 all_nodes.remove(self.op.node_name)
1317 return env, all_nodes, all_nodes
1319 def CheckPrereq(self):
1320 """Check prerequisites.
1323 - the node exists in the configuration
1324 - it does not have primary or secondary instances
1325 - it's not the master
1327 Any errors are signalled by raising errors.OpPrereqError.
1330 node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1332 raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1334 instance_list = self.cfg.GetInstanceList()
1336 masternode = self.sstore.GetMasterNode()
1337 if node.name == masternode:
1338 raise errors.OpPrereqError("Node is the master node,"
1339 " you need to failover first.")
1341 for instance_name in instance_list:
1342 instance = self.cfg.GetInstanceInfo(instance_name)
1343 if node.name == instance.primary_node:
1344 raise errors.OpPrereqError("Instance %s still running on the node,"
1345 " please remove first." % instance_name)
1346 if node.name in instance.secondary_nodes:
1347 raise errors.OpPrereqError("Instance %s has node as a secondary,"
1348 " please remove first." % instance_name)
1349 self.op.node_name = node.name
1352 def Exec(self, feedback_fn):
1353 """Removes the node from the cluster.
1357 logger.Info("stopping the node daemon and removing configs from node %s" %
1360 rpc.call_node_leave_cluster(node.name)
1362 logger.Info("Removing node %s from config" % node.name)
1364 self.cfg.RemoveNode(node.name)
1365 # Remove the node from the Ganeti Lock Manager
1366 self.context.glm.remove(locking.LEVEL_NODE, node.name)
1368 utils.RemoveHostFromEtcHosts(node.name)
1371 class LUQueryNodes(NoHooksLU):
1372 """Logical unit for querying nodes.
1375 _OP_REQP = ["output_fields", "names"]
1377 def CheckPrereq(self):
1378 """Check prerequisites.
1380 This checks that the fields required are valid output fields.
1383 self.dynamic_fields = frozenset([
1385 "mtotal", "mnode", "mfree",
1390 _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
1391 "pinst_list", "sinst_list",
1392 "pip", "sip", "tags"],
1393 dynamic=self.dynamic_fields,
1394 selected=self.op.output_fields)
1396 self.wanted = _GetWantedNodes(self, self.op.names)
1398 def Exec(self, feedback_fn):
1399 """Computes the list of nodes and their attributes.
1402 nodenames = self.wanted
1403 nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
1405 # begin data gathering
1407 if self.dynamic_fields.intersection(self.op.output_fields):
1409 node_data = rpc.call_node_info(nodenames, self.cfg.GetVGName())
1410 for name in nodenames:
1411 nodeinfo = node_data.get(name, None)
1414 "mtotal": utils.TryConvert(int, nodeinfo['memory_total']),
1415 "mnode": utils.TryConvert(int, nodeinfo['memory_dom0']),
1416 "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
1417 "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
1418 "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
1419 "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
1420 "bootid": nodeinfo['bootid'],
1423 live_data[name] = {}
1425 live_data = dict.fromkeys(nodenames, {})
1427 node_to_primary = dict([(name, set()) for name in nodenames])
1428 node_to_secondary = dict([(name, set()) for name in nodenames])
1430 inst_fields = frozenset(("pinst_cnt", "pinst_list",
1431 "sinst_cnt", "sinst_list"))
1432 if inst_fields & frozenset(self.op.output_fields):
1433 instancelist = self.cfg.GetInstanceList()
1435 for instance_name in instancelist:
1436 inst = self.cfg.GetInstanceInfo(instance_name)
1437 if inst.primary_node in node_to_primary:
1438 node_to_primary[inst.primary_node].add(inst.name)
1439 for secnode in inst.secondary_nodes:
1440 if secnode in node_to_secondary:
1441 node_to_secondary[secnode].add(inst.name)
1443 # end data gathering
1446 for node in nodelist:
1448 for field in self.op.output_fields:
1451 elif field == "pinst_list":
1452 val = list(node_to_primary[node.name])
1453 elif field == "sinst_list":
1454 val = list(node_to_secondary[node.name])
1455 elif field == "pinst_cnt":
1456 val = len(node_to_primary[node.name])
1457 elif field == "sinst_cnt":
1458 val = len(node_to_secondary[node.name])
1459 elif field == "pip":
1460 val = node.primary_ip
1461 elif field == "sip":
1462 val = node.secondary_ip
1463 elif field == "tags":
1464 val = list(node.GetTags())
1465 elif field in self.dynamic_fields:
1466 val = live_data[node.name].get(field, None)
1468 raise errors.ParameterError(field)
1469 node_output.append(val)
1470 output.append(node_output)
1475 class LUQueryNodeVolumes(NoHooksLU):
1476 """Logical unit for getting volumes on node(s).
1479 _OP_REQP = ["nodes", "output_fields"]
1481 def CheckPrereq(self):
1482 """Check prerequisites.
1484 This checks that the fields required are valid output fields.
1487 self.nodes = _GetWantedNodes(self, self.op.nodes)
1489 _CheckOutputFields(static=["node"],
1490 dynamic=["phys", "vg", "name", "size", "instance"],
1491 selected=self.op.output_fields)
1494 def Exec(self, feedback_fn):
1495 """Computes the list of nodes and their attributes.
1498 nodenames = self.nodes
1499 volumes = rpc.call_node_volumes(nodenames)
1501 ilist = [self.cfg.GetInstanceInfo(iname) for iname
1502 in self.cfg.GetInstanceList()]
1504 lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
1507 for node in nodenames:
1508 if node not in volumes or not volumes[node]:
1511 node_vols = volumes[node][:]
1512 node_vols.sort(key=lambda vol: vol['dev'])
1514 for vol in node_vols:
1516 for field in self.op.output_fields:
1519 elif field == "phys":
1523 elif field == "name":
1525 elif field == "size":
1526 val = int(float(vol['size']))
1527 elif field == "instance":
1529 if node not in lv_by_node[inst]:
1531 if vol['name'] in lv_by_node[inst][node]:
1537 raise errors.ParameterError(field)
1538 node_output.append(str(val))
1540 output.append(node_output)
1545 class LUAddNode(LogicalUnit):
1546 """Logical unit for adding node to the cluster.
1550 HTYPE = constants.HTYPE_NODE
1551 _OP_REQP = ["node_name"]
1553 def BuildHooksEnv(self):
1556 This will run on all nodes before, and on all nodes + the new node after.
1560 "OP_TARGET": self.op.node_name,
1561 "NODE_NAME": self.op.node_name,
1562 "NODE_PIP": self.op.primary_ip,
1563 "NODE_SIP": self.op.secondary_ip,
1565 nodes_0 = self.cfg.GetNodeList()
1566 nodes_1 = nodes_0 + [self.op.node_name, ]
1567 return env, nodes_0, nodes_1
1569 def CheckPrereq(self):
1570 """Check prerequisites.
1573 - the new node is not already in the config
1575 - its parameters (single/dual homed) matches the cluster
1577 Any errors are signalled by raising errors.OpPrereqError.
1580 node_name = self.op.node_name
1583 dns_data = utils.HostInfo(node_name)
1585 node = dns_data.name
1586 primary_ip = self.op.primary_ip = dns_data.ip
1587 secondary_ip = getattr(self.op, "secondary_ip", None)
1588 if secondary_ip is None:
1589 secondary_ip = primary_ip
1590 if not utils.IsValidIP(secondary_ip):
1591 raise errors.OpPrereqError("Invalid secondary IP given")
1592 self.op.secondary_ip = secondary_ip
1594 node_list = cfg.GetNodeList()
1595 if not self.op.readd and node in node_list:
1596 raise errors.OpPrereqError("Node %s is already in the configuration" %
1598 elif self.op.readd and node not in node_list:
1599 raise errors.OpPrereqError("Node %s is not in the configuration" % node)
1601 for existing_node_name in node_list:
1602 existing_node = cfg.GetNodeInfo(existing_node_name)
1604 if self.op.readd and node == existing_node_name:
1605 if (existing_node.primary_ip != primary_ip or
1606 existing_node.secondary_ip != secondary_ip):
1607 raise errors.OpPrereqError("Readded node doesn't have the same IP"
1608 " address configuration as before")
1611 if (existing_node.primary_ip == primary_ip or
1612 existing_node.secondary_ip == primary_ip or
1613 existing_node.primary_ip == secondary_ip or
1614 existing_node.secondary_ip == secondary_ip):
1615 raise errors.OpPrereqError("New node ip address(es) conflict with"
1616 " existing node %s" % existing_node.name)
1618 # check that the type of the node (single versus dual homed) is the
1619 # same as for the master
1620 myself = cfg.GetNodeInfo(self.sstore.GetMasterNode())
1621 master_singlehomed = myself.secondary_ip == myself.primary_ip
1622 newbie_singlehomed = secondary_ip == primary_ip
1623 if master_singlehomed != newbie_singlehomed:
1624 if master_singlehomed:
1625 raise errors.OpPrereqError("The master has no private ip but the"
1626 " new node has one")
1628 raise errors.OpPrereqError("The master has a private ip but the"
1629 " new node doesn't have one")
1631 # checks reachablity
1632 if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
1633 raise errors.OpPrereqError("Node not reachable by ping")
1635 if not newbie_singlehomed:
1636 # check reachability from my secondary ip to newbie's secondary ip
1637 if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
1638 source=myself.secondary_ip):
1639 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
1640 " based ping to noded port")
1642 self.new_node = objects.Node(name=node,
1643 primary_ip=primary_ip,
1644 secondary_ip=secondary_ip)
1646 def Exec(self, feedback_fn):
1647 """Adds the new node to the cluster.
1650 new_node = self.new_node
1651 node = new_node.name
1653 # check connectivity
1654 result = rpc.call_version([node])[node]
1656 if constants.PROTOCOL_VERSION == result:
1657 logger.Info("communication to node %s fine, sw version %s match" %
1660 raise errors.OpExecError("Version mismatch master version %s,"
1661 " node version %s" %
1662 (constants.PROTOCOL_VERSION, result))
1664 raise errors.OpExecError("Cannot get version from the new node")
1667 logger.Info("copy ssh key to node %s" % node)
1668 priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
1670 keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
1671 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
1677 keyarray.append(f.read())
1681 result = rpc.call_node_add(node, keyarray[0], keyarray[1], keyarray[2],
1682 keyarray[3], keyarray[4], keyarray[5])
1685 raise errors.OpExecError("Cannot transfer ssh keys to the new node")
1687 # Add node to our /etc/hosts, and add key to known_hosts
1688 utils.AddHostToEtcHosts(new_node.name)
1690 if new_node.secondary_ip != new_node.primary_ip:
1691 if not rpc.call_node_tcp_ping(new_node.name,
1692 constants.LOCALHOST_IP_ADDRESS,
1693 new_node.secondary_ip,
1694 constants.DEFAULT_NODED_PORT,
1696 raise errors.OpExecError("Node claims it doesn't have the secondary ip"
1697 " you gave (%s). Please fix and re-run this"
1698 " command." % new_node.secondary_ip)
1700 node_verify_list = [self.sstore.GetMasterNode()]
1701 node_verify_param = {
1703 # TODO: do a node-net-test as well?
1706 result = rpc.call_node_verify(node_verify_list, node_verify_param)
1707 for verifier in node_verify_list:
1708 if not result[verifier]:
1709 raise errors.OpExecError("Cannot communicate with %s's node daemon"
1710 " for remote verification" % verifier)
1711 if result[verifier]['nodelist']:
1712 for failed in result[verifier]['nodelist']:
1713 feedback_fn("ssh/hostname verification failed %s -> %s" %
1714 (verifier, result[verifier]['nodelist'][failed]))
1715 raise errors.OpExecError("ssh/hostname verification failed.")
1717 # Distribute updated /etc/hosts and known_hosts to all nodes,
1718 # including the node just added
1719 myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
1720 dist_nodes = self.cfg.GetNodeList()
1721 if not self.op.readd:
1722 dist_nodes.append(node)
1723 if myself.name in dist_nodes:
1724 dist_nodes.remove(myself.name)
1726 logger.Debug("Copying hosts and known_hosts to all nodes")
1727 for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
1728 result = rpc.call_upload_file(dist_nodes, fname)
1729 for to_node in dist_nodes:
1730 if not result[to_node]:
1731 logger.Error("copy of file %s to node %s failed" %
1734 to_copy = self.sstore.GetFileList()
1735 if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
1736 to_copy.append(constants.VNC_PASSWORD_FILE)
1737 for fname in to_copy:
1738 result = rpc.call_upload_file([node], fname)
1739 if not result[node]:
1740 logger.Error("could not copy file %s to node %s" % (fname, node))
1742 if not self.op.readd:
1743 logger.Info("adding node %s to cluster.conf" % node)
1744 self.cfg.AddNode(new_node)
1745 # Add the new node to the Ganeti Lock Manager
1746 self.context.glm.add(locking.LEVEL_NODE, node)
1749 class LUQueryClusterInfo(NoHooksLU):
1750 """Query cluster configuration.
1757 def ExpandNames(self):
1758 self.needed_locks = {}
1760 def CheckPrereq(self):
1761 """No prerequsites needed for this LU.
1766 def Exec(self, feedback_fn):
1767 """Return cluster config.
1771 "name": self.sstore.GetClusterName(),
1772 "software_version": constants.RELEASE_VERSION,
1773 "protocol_version": constants.PROTOCOL_VERSION,
1774 "config_version": constants.CONFIG_VERSION,
1775 "os_api_version": constants.OS_API_VERSION,
1776 "export_version": constants.EXPORT_VERSION,
1777 "master": self.sstore.GetMasterNode(),
1778 "architecture": (platform.architecture()[0], platform.machine()),
1779 "hypervisor_type": self.sstore.GetHypervisorType(),
1785 class LUDumpClusterConfig(NoHooksLU):
1786 """Return a text-representation of the cluster-config.
1792 def ExpandNames(self):
1793 self.needed_locks = {}
1795 def CheckPrereq(self):
1796 """No prerequisites.
1801 def Exec(self, feedback_fn):
1802 """Dump a representation of the cluster config to the standard output.
1805 return self.cfg.DumpConfig()
1808 class LUActivateInstanceDisks(NoHooksLU):
1809 """Bring up an instance's disks.
1812 _OP_REQP = ["instance_name"]
1814 def CheckPrereq(self):
1815 """Check prerequisites.
1817 This checks that the instance is in the cluster.
1820 instance = self.cfg.GetInstanceInfo(
1821 self.cfg.ExpandInstanceName(self.op.instance_name))
1822 if instance is None:
1823 raise errors.OpPrereqError("Instance '%s' not known" %
1824 self.op.instance_name)
1825 self.instance = instance
1828 def Exec(self, feedback_fn):
1829 """Activate the disks.
1832 disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
1834 raise errors.OpExecError("Cannot activate block devices")
1839 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
1840 """Prepare the block devices for an instance.
1842 This sets up the block devices on all nodes.
1845 instance: a ganeti.objects.Instance object
1846 ignore_secondaries: if true, errors on secondary nodes won't result
1847 in an error return from the function
1850 false if the operation failed
1851 list of (host, instance_visible_name, node_visible_name) if the operation
1852 suceeded with the mapping from node devices to instance devices
1856 iname = instance.name
1857 # With the two passes mechanism we try to reduce the window of
1858 # opportunity for the race condition of switching DRBD to primary
1859 # before handshaking occured, but we do not eliminate it
1861 # The proper fix would be to wait (with some limits) until the
1862 # connection has been made and drbd transitions from WFConnection
1863 # into any other network-connected state (Connected, SyncTarget,
1866 # 1st pass, assemble on all nodes in secondary mode
1867 for inst_disk in instance.disks:
1868 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1869 cfg.SetDiskID(node_disk, node)
1870 result = rpc.call_blockdev_assemble(node, node_disk, iname, False)
1872 logger.Error("could not prepare block device %s on node %s"
1873 " (is_primary=False, pass=1)" % (inst_disk.iv_name, node))
1874 if not ignore_secondaries:
1877 # FIXME: race condition on drbd migration to primary
1879 # 2nd pass, do only the primary node
1880 for inst_disk in instance.disks:
1881 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1882 if node != instance.primary_node:
1884 cfg.SetDiskID(node_disk, node)
1885 result = rpc.call_blockdev_assemble(node, node_disk, iname, True)
1887 logger.Error("could not prepare block device %s on node %s"
1888 " (is_primary=True, pass=2)" % (inst_disk.iv_name, node))
1890 device_info.append((instance.primary_node, inst_disk.iv_name, result))
1892 # leave the disks configured for the primary node
1893 # this is a workaround that would be fixed better by
1894 # improving the logical/physical id handling
1895 for disk in instance.disks:
1896 cfg.SetDiskID(disk, instance.primary_node)
1898 return disks_ok, device_info
1901 def _StartInstanceDisks(cfg, instance, force):
1902 """Start the disks of an instance.
1905 disks_ok, dummy = _AssembleInstanceDisks(instance, cfg,
1906 ignore_secondaries=force)
1908 _ShutdownInstanceDisks(instance, cfg)
1909 if force is not None and not force:
1910 logger.Error("If the message above refers to a secondary node,"
1911 " you can retry the operation using '--force'.")
1912 raise errors.OpExecError("Disk consistency error")
1915 class LUDeactivateInstanceDisks(NoHooksLU):
1916 """Shutdown an instance's disks.
1919 _OP_REQP = ["instance_name"]
1921 def CheckPrereq(self):
1922 """Check prerequisites.
1924 This checks that the instance is in the cluster.
1927 instance = self.cfg.GetInstanceInfo(
1928 self.cfg.ExpandInstanceName(self.op.instance_name))
1929 if instance is None:
1930 raise errors.OpPrereqError("Instance '%s' not known" %
1931 self.op.instance_name)
1932 self.instance = instance
1934 def Exec(self, feedback_fn):
1935 """Deactivate the disks
1938 instance = self.instance
1939 ins_l = rpc.call_instance_list([instance.primary_node])
1940 ins_l = ins_l[instance.primary_node]
1941 if not type(ins_l) is list:
1942 raise errors.OpExecError("Can't contact node '%s'" %
1943 instance.primary_node)
1945 if self.instance.name in ins_l:
1946 raise errors.OpExecError("Instance is running, can't shutdown"
1949 _ShutdownInstanceDisks(instance, self.cfg)
1952 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
1953 """Shutdown block devices of an instance.
1955 This does the shutdown on all nodes of the instance.
1957 If the ignore_primary is false, errors on the primary node are
1962 for disk in instance.disks:
1963 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1964 cfg.SetDiskID(top_disk, node)
1965 if not rpc.call_blockdev_shutdown(node, top_disk):
1966 logger.Error("could not shutdown block device %s on node %s" %
1967 (disk.iv_name, node))
1968 if not ignore_primary or node != instance.primary_node:
1973 def _CheckNodeFreeMemory(cfg, node, reason, requested):
1974 """Checks if a node has enough free memory.
1976 This function check if a given node has the needed amount of free
1977 memory. In case the node has less memory or we cannot get the
1978 information from the node, this function raise an OpPrereqError
1982 - cfg: a ConfigWriter instance
1983 - node: the node name
1984 - reason: string to use in the error message
1985 - requested: the amount of memory in MiB
1988 nodeinfo = rpc.call_node_info([node], cfg.GetVGName())
1989 if not nodeinfo or not isinstance(nodeinfo, dict):
1990 raise errors.OpPrereqError("Could not contact node %s for resource"
1991 " information" % (node,))
1993 free_mem = nodeinfo[node].get('memory_free')
1994 if not isinstance(free_mem, int):
1995 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1996 " was '%s'" % (node, free_mem))
1997 if requested > free_mem:
1998 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1999 " needed %s MiB, available %s MiB" %
2000 (node, reason, requested, free_mem))
2003 class LUStartupInstance(LogicalUnit):
2004 """Starts an instance.
2007 HPATH = "instance-start"
2008 HTYPE = constants.HTYPE_INSTANCE
2009 _OP_REQP = ["instance_name", "force"]
2012 def ExpandNames(self):
2013 self._ExpandAndLockInstance()
2014 self.needed_locks[locking.LEVEL_NODE] = []
2015 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2017 def DeclareLocks(self, level):
2018 if level == locking.LEVEL_NODE:
2019 self._LockInstancesNodes()
2021 def BuildHooksEnv(self):
2024 This runs on master, primary and secondary nodes of the instance.
2028 "FORCE": self.op.force,
2030 env.update(_BuildInstanceHookEnvByObject(self.instance))
2031 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2032 list(self.instance.secondary_nodes))
2035 def CheckPrereq(self):
2036 """Check prerequisites.
2038 This checks that the instance is in the cluster.
2041 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2042 assert self.instance is not None, \
2043 "Cannot retrieve locked instance %s" % self.op.instance_name
2045 # check bridges existance
2046 _CheckInstanceBridgesExist(instance)
2048 _CheckNodeFreeMemory(self.cfg, instance.primary_node,
2049 "starting instance %s" % instance.name,
2052 def Exec(self, feedback_fn):
2053 """Start the instance.
2056 instance = self.instance
2057 force = self.op.force
2058 extra_args = getattr(self.op, "extra_args", "")
2060 self.cfg.MarkInstanceUp(instance.name)
2062 node_current = instance.primary_node
2064 _StartInstanceDisks(self.cfg, instance, force)
2066 if not rpc.call_instance_start(node_current, instance, extra_args):
2067 _ShutdownInstanceDisks(instance, self.cfg)
2068 raise errors.OpExecError("Could not start instance")
2071 class LURebootInstance(LogicalUnit):
2072 """Reboot an instance.
2075 HPATH = "instance-reboot"
2076 HTYPE = constants.HTYPE_INSTANCE
2077 _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2080 def ExpandNames(self):
2081 self._ExpandAndLockInstance()
2082 self.needed_locks[locking.LEVEL_NODE] = []
2083 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2085 def DeclareLocks(self, level):
2086 if level == locking.LEVEL_NODE:
2087 self._LockInstancesNodes()
2089 def BuildHooksEnv(self):
2092 This runs on master, primary and secondary nodes of the instance.
2096 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2098 env.update(_BuildInstanceHookEnvByObject(self.instance))
2099 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2100 list(self.instance.secondary_nodes))
2103 def CheckPrereq(self):
2104 """Check prerequisites.
2106 This checks that the instance is in the cluster.
2109 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2110 assert self.instance is not None, \
2111 "Cannot retrieve locked instance %s" % self.op.instance_name
2113 # check bridges existance
2114 _CheckInstanceBridgesExist(instance)
2116 def Exec(self, feedback_fn):
2117 """Reboot the instance.
2120 instance = self.instance
2121 ignore_secondaries = self.op.ignore_secondaries
2122 reboot_type = self.op.reboot_type
2123 extra_args = getattr(self.op, "extra_args", "")
2125 node_current = instance.primary_node
2127 if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2128 constants.INSTANCE_REBOOT_HARD,
2129 constants.INSTANCE_REBOOT_FULL]:
2130 raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2131 (constants.INSTANCE_REBOOT_SOFT,
2132 constants.INSTANCE_REBOOT_HARD,
2133 constants.INSTANCE_REBOOT_FULL))
2135 if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
2136 constants.INSTANCE_REBOOT_HARD]:
2137 if not rpc.call_instance_reboot(node_current, instance,
2138 reboot_type, extra_args):
2139 raise errors.OpExecError("Could not reboot instance")
2141 if not rpc.call_instance_shutdown(node_current, instance):
2142 raise errors.OpExecError("could not shutdown instance for full reboot")
2143 _ShutdownInstanceDisks(instance, self.cfg)
2144 _StartInstanceDisks(self.cfg, instance, ignore_secondaries)
2145 if not rpc.call_instance_start(node_current, instance, extra_args):
2146 _ShutdownInstanceDisks(instance, self.cfg)
2147 raise errors.OpExecError("Could not start instance for full reboot")
2149 self.cfg.MarkInstanceUp(instance.name)
2152 class LUShutdownInstance(LogicalUnit):
2153 """Shutdown an instance.
2156 HPATH = "instance-stop"
2157 HTYPE = constants.HTYPE_INSTANCE
2158 _OP_REQP = ["instance_name"]
2161 def ExpandNames(self):
2162 self._ExpandAndLockInstance()
2163 self.needed_locks[locking.LEVEL_NODE] = []
2164 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2166 def DeclareLocks(self, level):
2167 if level == locking.LEVEL_NODE:
2168 self._LockInstancesNodes()
2170 def BuildHooksEnv(self):
2173 This runs on master, primary and secondary nodes of the instance.
2176 env = _BuildInstanceHookEnvByObject(self.instance)
2177 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2178 list(self.instance.secondary_nodes))
2181 def CheckPrereq(self):
2182 """Check prerequisites.
2184 This checks that the instance is in the cluster.
2187 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2188 assert self.instance is not None, \
2189 "Cannot retrieve locked instance %s" % self.op.instance_name
2191 def Exec(self, feedback_fn):
2192 """Shutdown the instance.
2195 instance = self.instance
2196 node_current = instance.primary_node
2197 self.cfg.MarkInstanceDown(instance.name)
2198 if not rpc.call_instance_shutdown(node_current, instance):
2199 logger.Error("could not shutdown instance")
2201 _ShutdownInstanceDisks(instance, self.cfg)
2204 class LUReinstallInstance(LogicalUnit):
2205 """Reinstall an instance.
2208 HPATH = "instance-reinstall"
2209 HTYPE = constants.HTYPE_INSTANCE
2210 _OP_REQP = ["instance_name"]
2213 def ExpandNames(self):
2214 self._ExpandAndLockInstance()
2215 self.needed_locks[locking.LEVEL_NODE] = []
2216 self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
2218 def DeclareLocks(self, level):
2219 if level == locking.LEVEL_NODE:
2220 self._LockInstancesNodes()
2222 def BuildHooksEnv(self):
2225 This runs on master, primary and secondary nodes of the instance.
2228 env = _BuildInstanceHookEnvByObject(self.instance)
2229 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2230 list(self.instance.secondary_nodes))
2233 def CheckPrereq(self):
2234 """Check prerequisites.
2236 This checks that the instance is in the cluster and is not running.
2239 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2240 assert instance is not None, \
2241 "Cannot retrieve locked instance %s" % self.op.instance_name
2243 if instance.disk_template == constants.DT_DISKLESS:
2244 raise errors.OpPrereqError("Instance '%s' has no disks" %
2245 self.op.instance_name)
2246 if instance.status != "down":
2247 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2248 self.op.instance_name)
2249 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2251 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2252 (self.op.instance_name,
2253 instance.primary_node))
2255 self.op.os_type = getattr(self.op, "os_type", None)
2256 if self.op.os_type is not None:
2258 pnode = self.cfg.GetNodeInfo(
2259 self.cfg.ExpandNodeName(instance.primary_node))
2261 raise errors.OpPrereqError("Primary node '%s' is unknown" %
2263 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
2265 raise errors.OpPrereqError("OS '%s' not in supported OS list for"
2266 " primary node" % self.op.os_type)
2268 self.instance = instance
2270 def Exec(self, feedback_fn):
2271 """Reinstall the instance.
2274 inst = self.instance
2276 if self.op.os_type is not None:
2277 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2278 inst.os = self.op.os_type
2279 self.cfg.AddInstance(inst)
2281 _StartInstanceDisks(self.cfg, inst, None)
2283 feedback_fn("Running the instance OS create scripts...")
2284 if not rpc.call_instance_os_add(inst.primary_node, inst, "sda", "sdb"):
2285 raise errors.OpExecError("Could not install OS for instance %s"
2287 (inst.name, inst.primary_node))
2289 _ShutdownInstanceDisks(inst, self.cfg)
2292 class LURenameInstance(LogicalUnit):
2293 """Rename an instance.
2296 HPATH = "instance-rename"
2297 HTYPE = constants.HTYPE_INSTANCE
2298 _OP_REQP = ["instance_name", "new_name"]
2300 def BuildHooksEnv(self):
2303 This runs on master, primary and secondary nodes of the instance.
2306 env = _BuildInstanceHookEnvByObject(self.instance)
2307 env["INSTANCE_NEW_NAME"] = self.op.new_name
2308 nl = ([self.sstore.GetMasterNode(), self.instance.primary_node] +
2309 list(self.instance.secondary_nodes))
2312 def CheckPrereq(self):
2313 """Check prerequisites.
2315 This checks that the instance is in the cluster and is not running.
2318 instance = self.cfg.GetInstanceInfo(
2319 self.cfg.ExpandInstanceName(self.op.instance_name))
2320 if instance is None:
2321 raise errors.OpPrereqError("Instance '%s' not known" %
2322 self.op.instance_name)
2323 if instance.status != "down":
2324 raise errors.OpPrereqError("Instance '%s' is marked to be up" %
2325 self.op.instance_name)
2326 remote_info = rpc.call_instance_info(instance.primary_node, instance.name)
2328 raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
2329 (self.op.instance_name,
2330 instance.primary_node))
2331 self.instance = instance
2333 # new name verification
2334 name_info = utils.HostInfo(self.op.new_name)
2336 self.op.new_name = new_name = name_info.name
2337 instance_list = self.cfg.GetInstanceList()
2338 if new_name in instance_list:
2339 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2342 if not getattr(self.op, "ignore_ip", False):
2343 if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
2344 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2345 (name_info.ip, new_name))
2348 def Exec(self, feedback_fn):
2349 """Reinstall the instance.
2352 inst = self.instance
2353 old_name = inst.name
2355 if inst.disk_template == constants.DT_FILE:
2356 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2358 self.cfg.RenameInstance(inst.name, self.op.new_name)
2359 # Change the instance lock. This is definitely safe while we hold the BGL
2360 self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
2361 self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2363 # re-read the instance from the configuration after rename
2364 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2366 if inst.disk_template == constants.DT_FILE:
2367 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2368 result = rpc.call_file_storage_dir_rename(inst.primary_node,
2369 old_file_storage_dir,
2370 new_file_storage_dir)
2373 raise errors.OpExecError("Could not connect to node '%s' to rename"
2374 " directory '%s' to '%s' (but the instance"
2375 " has been renamed in Ganeti)" % (
2376 inst.primary_node, old_file_storage_dir,
2377 new_file_storage_dir))
2380 raise errors.OpExecError("Could not rename directory '%s' to '%s'"
2381 " (but the instance has been renamed in"
2382 " Ganeti)" % (old_file_storage_dir,
2383 new_file_storage_dir))
2385 _StartInstanceDisks(self.cfg, inst, None)
2387 if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
2389 msg = ("Could run OS rename script for instance %s on node %s (but the"
2390 " instance has been renamed in Ganeti)" %
2391 (inst.name, inst.primary_node))
2394 _ShutdownInstanceDisks(inst, self.cfg)
2397 class LURemoveInstance(LogicalUnit):
2398 """Remove an instance.
2401 HPATH = "instance-remove"
2402 HTYPE = constants.HTYPE_INSTANCE
2403 _OP_REQP = ["instance_name", "ignore_failures"]
2405 def BuildHooksEnv(self):
2408 This runs on master, primary and secondary nodes of the instance.
2411 env = _BuildInstanceHookEnvByObject(self.instance)
2412 nl = [self.sstore.GetMasterNode()]
2415 def CheckPrereq(self):
2416 """Check prerequisites.
2418 This checks that the instance is in the cluster.
2421 instance = self.cfg.GetInstanceInfo(
2422 self.cfg.ExpandInstanceName(self.op.instance_name))
2423 if instance is None:
2424 raise errors.OpPrereqError("Instance '%s' not known" %
2425 self.op.instance_name)
2426 self.instance = instance
2428 def Exec(self, feedback_fn):
2429 """Remove the instance.
2432 instance = self.instance
2433 logger.Info("shutting down instance %s on node %s" %
2434 (instance.name, instance.primary_node))
2436 if not rpc.call_instance_shutdown(instance.primary_node, instance):
2437 if self.op.ignore_failures:
2438 feedback_fn("Warning: can't shutdown instance")
2440 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2441 (instance.name, instance.primary_node))
2443 logger.Info("removing block devices for instance %s" % instance.name)
2445 if not _RemoveDisks(instance, self.cfg):
2446 if self.op.ignore_failures:
2447 feedback_fn("Warning: can't remove instance's disks")
2449 raise errors.OpExecError("Can't remove instance's disks")
2451 logger.Info("removing instance %s out of cluster config" % instance.name)
2453 self.cfg.RemoveInstance(instance.name)
2454 # Remove the new instance from the Ganeti Lock Manager
2455 self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
2458 class LUQueryInstances(NoHooksLU):
2459 """Logical unit for querying instances.
2462 _OP_REQP = ["output_fields", "names"]
2464 def CheckPrereq(self):
2465 """Check prerequisites.
2467 This checks that the fields required are valid output fields.
2470 self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
2471 _CheckOutputFields(static=["name", "os", "pnode", "snodes",
2472 "admin_state", "admin_ram",
2473 "disk_template", "ip", "mac", "bridge",
2474 "sda_size", "sdb_size", "vcpus", "tags"],
2475 dynamic=self.dynamic_fields,
2476 selected=self.op.output_fields)
2478 self.wanted = _GetWantedInstances(self, self.op.names)
2480 def Exec(self, feedback_fn):
2481 """Computes the list of nodes and their attributes.
2484 instance_names = self.wanted
2485 instance_list = [self.cfg.GetInstanceInfo(iname) for iname
2488 # begin data gathering
2490 nodes = frozenset([inst.primary_node for inst in instance_list])
2493 if self.dynamic_fields.intersection(self.op.output_fields):
2495 node_data = rpc.call_all_instances_info(nodes)
2497 result = node_data[name]
2499 live_data.update(result)
2500 elif result == False:
2501 bad_nodes.append(name)
2502 # else no instance is alive
2504 live_data = dict([(name, {}) for name in instance_names])
2506 # end data gathering
2509 for instance in instance_list:
2511 for field in self.op.output_fields:
2516 elif field == "pnode":
2517 val = instance.primary_node
2518 elif field == "snodes":
2519 val = list(instance.secondary_nodes)
2520 elif field == "admin_state":
2521 val = (instance.status != "down")
2522 elif field == "oper_state":
2523 if instance.primary_node in bad_nodes:
2526 val = bool(live_data.get(instance.name))
2527 elif field == "status":
2528 if instance.primary_node in bad_nodes:
2529 val = "ERROR_nodedown"
2531 running = bool(live_data.get(instance.name))
2533 if instance.status != "down":
2538 if instance.status != "down":
2542 elif field == "admin_ram":
2543 val = instance.memory
2544 elif field == "oper_ram":
2545 if instance.primary_node in bad_nodes:
2547 elif instance.name in live_data:
2548 val = live_data[instance.name].get("memory", "?")
2551 elif field == "disk_template":
2552 val = instance.disk_template
2554 val = instance.nics[0].ip
2555 elif field == "bridge":
2556 val = instance.nics[0].bridge
2557 elif field == "mac":
2558 val = instance.nics[0].mac
2559 elif field == "sda_size" or field == "sdb_size":
2560 disk = instance.FindDisk(field[:3])
2565 elif field == "vcpus":
2566 val = instance.vcpus
2567 elif field == "tags":
2568 val = list(instance.GetTags())
2570 raise errors.ParameterError(field)
2577 class LUFailoverInstance(LogicalUnit):
2578 """Failover an instance.
2581 HPATH = "instance-failover"
2582 HTYPE = constants.HTYPE_INSTANCE
2583 _OP_REQP = ["instance_name", "ignore_consistency"]
2585 def BuildHooksEnv(self):
2588 This runs on master, primary and secondary nodes of the instance.
2592 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2594 env.update(_BuildInstanceHookEnvByObject(self.instance))
2595 nl = [self.sstore.GetMasterNode()] + list(self.instance.secondary_nodes)
2598 def CheckPrereq(self):
2599 """Check prerequisites.
2601 This checks that the instance is in the cluster.
2604 instance = self.cfg.GetInstanceInfo(
2605 self.cfg.ExpandInstanceName(self.op.instance_name))
2606 if instance is None:
2607 raise errors.OpPrereqError("Instance '%s' not known" %
2608 self.op.instance_name)
2610 if instance.disk_template not in constants.DTS_NET_MIRROR:
2611 raise errors.OpPrereqError("Instance's disk layout is not"
2612 " network mirrored, cannot failover.")
2614 secondary_nodes = instance.secondary_nodes
2615 if not secondary_nodes:
2616 raise errors.ProgrammerError("no secondary node but using "
2617 "a mirrored disk template")
2619 target_node = secondary_nodes[0]
2620 # check memory requirements on the secondary node
2621 _CheckNodeFreeMemory(self.cfg, target_node, "failing over instance %s" %
2622 instance.name, instance.memory)
2624 # check bridge existance
2625 brlist = [nic.bridge for nic in instance.nics]
2626 if not rpc.call_bridges_exist(target_node, brlist):
2627 raise errors.OpPrereqError("One or more target bridges %s does not"
2628 " exist on destination node '%s'" %
2629 (brlist, target_node))
2631 self.instance = instance
2633 def Exec(self, feedback_fn):
2634 """Failover an instance.
2636 The failover is done by shutting it down on its present node and
2637 starting it on the secondary.
2640 instance = self.instance
2642 source_node = instance.primary_node
2643 target_node = instance.secondary_nodes[0]
2645 feedback_fn("* checking disk consistency between source and target")
2646 for dev in instance.disks:
2647 # for drbd, these are drbd over lvm
2648 if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
2649 if instance.status == "up" and not self.op.ignore_consistency:
2650 raise errors.OpExecError("Disk %s is degraded on target node,"
2651 " aborting failover." % dev.iv_name)
2653 feedback_fn("* shutting down instance on source node")
2654 logger.Info("Shutting down instance %s on node %s" %
2655 (instance.name, source_node))
2657 if not rpc.call_instance_shutdown(source_node, instance):
2658 if self.op.ignore_consistency:
2659 logger.Error("Could not shutdown instance %s on node %s. Proceeding"
2660 " anyway. Please make sure node %s is down" %
2661 (instance.name, source_node, source_node))
2663 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
2664 (instance.name, source_node))
2666 feedback_fn("* deactivating the instance's disks on source node")
2667 if not _ShutdownInstanceDisks(instance, self.cfg, ignore_primary=True):
2668 raise errors.OpExecError("Can't shut down the instance's disks.")
2670 instance.primary_node = target_node
2671 # distribute new instance config to the other nodes
2672 self.cfg.Update(instance)
2674 # Only start the instance if it's marked as up
2675 if instance.status == "up":
2676 feedback_fn("* activating the instance's disks on target node")
2677 logger.Info("Starting instance %s on node %s" %
2678 (instance.name, target_node))
2680 disks_ok, dummy = _AssembleInstanceDisks(instance, self.cfg,
2681 ignore_secondaries=True)
2683 _ShutdownInstanceDisks(instance, self.cfg)
2684 raise errors.OpExecError("Can't activate the instance's disks")
2686 feedback_fn("* starting the instance on the target node")
2687 if not rpc.call_instance_start(target_node, instance, None):
2688 _ShutdownInstanceDisks(instance, self.cfg)
2689 raise errors.OpExecError("Could not start instance %s on node %s." %
2690 (instance.name, target_node))
2693 def _CreateBlockDevOnPrimary(cfg, node, instance, device, info):
2694 """Create a tree of block devices on the primary node.
2696 This always creates all devices.
2700 for child in device.children:
2701 if not _CreateBlockDevOnPrimary(cfg, node, instance, child, info):
2704 cfg.SetDiskID(device, node)
2705 new_id = rpc.call_blockdev_create(node, device, device.size,
2706 instance.name, True, info)
2709 if device.physical_id is None:
2710 device.physical_id = new_id
2714 def _CreateBlockDevOnSecondary(cfg, node, instance, device, force, info):
2715 """Create a tree of block devices on a secondary node.
2717 If this device type has to be created on secondaries, create it and
2720 If not, just recurse to children keeping the same 'force' value.
2723 if device.CreateOnSecondary():
2726 for child in device.children:
2727 if not _CreateBlockDevOnSecondary(cfg, node, instance,
2728 child, force, info):
2733 cfg.SetDiskID(device, node)
2734 new_id = rpc.call_blockdev_create(node, device, device.size,
2735 instance.name, False, info)
2738 if device.physical_id is None:
2739 device.physical_id = new_id
2743 def _GenerateUniqueNames(cfg, exts):
2744 """Generate a suitable LV name.
2746 This will generate a logical volume name for the given instance.
2751 new_id = cfg.GenerateUniqueID()
2752 results.append("%s%s" % (new_id, val))
2756 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
2757 """Generate a drbd8 device complete with its children.
2760 port = cfg.AllocatePort()
2761 vgname = cfg.GetVGName()
2762 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
2763 logical_id=(vgname, names[0]))
2764 dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
2765 logical_id=(vgname, names[1]))
2766 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
2767 logical_id = (primary, secondary, port),
2768 children = [dev_data, dev_meta],
2773 def _GenerateDiskTemplate(cfg, template_name,
2774 instance_name, primary_node,
2775 secondary_nodes, disk_sz, swap_sz,
2776 file_storage_dir, file_driver):
2777 """Generate the entire disk layout for a given template type.
2780 #TODO: compute space requirements
2782 vgname = cfg.GetVGName()
2783 if template_name == constants.DT_DISKLESS:
2785 elif template_name == constants.DT_PLAIN:
2786 if len(secondary_nodes) != 0:
2787 raise errors.ProgrammerError("Wrong template configuration")
2789 names = _GenerateUniqueNames(cfg, [".sda", ".sdb"])
2790 sda_dev = objects.Disk(dev_type=constants.LD_LV, size=disk_sz,
2791 logical_id=(vgname, names[0]),
2793 sdb_dev = objects.Disk(dev_type=constants.LD_LV, size=swap_sz,
2794 logical_id=(vgname, names[1]),
2796 disks = [sda_dev, sdb_dev]
2797 elif template_name == constants.DT_DRBD8:
2798 if len(secondary_nodes) != 1:
2799 raise errors.ProgrammerError("Wrong template configuration")
2800 remote_node = secondary_nodes[0]
2801 names = _GenerateUniqueNames(cfg, [".sda_data", ".sda_meta",
2802 ".sdb_data", ".sdb_meta"])
2803 drbd_sda_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2804 disk_sz, names[0:2], "sda")
2805 drbd_sdb_dev = _GenerateDRBD8Branch(cfg, primary_node, remote_node,
2806 swap_sz, names[2:4], "sdb")
2807 disks = [drbd_sda_dev, drbd_sdb_dev]
2808 elif template_name == constants.DT_FILE:
2809 if len(secondary_nodes) != 0:
2810 raise errors.ProgrammerError("Wrong template configuration")
2812 file_sda_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk_sz,
2813 iv_name="sda", logical_id=(file_driver,
2814 "%s/sda" % file_storage_dir))
2815 file_sdb_dev = objects.Disk(dev_type=constants.LD_FILE, size=swap_sz,
2816 iv_name="sdb", logical_id=(file_driver,
2817 "%s/sdb" % file_storage_dir))
2818 disks = [file_sda_dev, file_sdb_dev]
2820 raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
2824 def _GetInstanceInfoText(instance):
2825 """Compute that text that should be added to the disk's metadata.
2828 return "originstname+%s" % instance.name
2831 def _CreateDisks(cfg, instance):
2832 """Create all disks for an instance.
2834 This abstracts away some work from AddInstance.
2837 instance: the instance object
2840 True or False showing the success of the creation process
2843 info = _GetInstanceInfoText(instance)
2845 if instance.disk_template == constants.DT_FILE:
2846 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2847 result = rpc.call_file_storage_dir_create(instance.primary_node,
2851 logger.Error("Could not connect to node '%s'" % instance.primary_node)
2855 logger.Error("failed to create directory '%s'" % file_storage_dir)
2858 for device in instance.disks:
2859 logger.Info("creating volume %s for instance %s" %
2860 (device.iv_name, instance.name))
2862 for secondary_node in instance.secondary_nodes:
2863 if not _CreateBlockDevOnSecondary(cfg, secondary_node, instance,
2864 device, False, info):
2865 logger.Error("failed to create volume %s (%s) on secondary node %s!" %
2866 (device.iv_name, device, secondary_node))
2869 if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
2870 instance, device, info):
2871 logger.Error("failed to create volume %s on primary!" %
2878 def _RemoveDisks(instance, cfg):
2879 """Remove all disks for an instance.
2881 This abstracts away some work from `AddInstance()` and
2882 `RemoveInstance()`. Note that in case some of the devices couldn't
2883 be removed, the removal will continue with the other ones (compare
2884 with `_CreateDisks()`).
2887 instance: the instance object
2890 True or False showing the success of the removal proces
2893 logger.Info("removing block devices for instance %s" % instance.name)
2896 for device in instance.disks:
2897 for node, disk in device.ComputeNodeTree(instance.primary_node):
2898 cfg.SetDiskID(disk, node)
2899 if not rpc.call_blockdev_remove(node, disk):
2900 logger.Error("could not remove block device %s on node %s,"
2901 " continuing anyway" %
2902 (device.iv_name, node))
2905 if instance.disk_template == constants.DT_FILE:
2906 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
2907 if not rpc.call_file_storage_dir_remove(instance.primary_node,
2909 logger.Error("could not remove directory '%s'" % file_storage_dir)
2915 def _ComputeDiskSize(disk_template, disk_size, swap_size):
2916 """Compute disk size requirements in the volume group
2918 This is currently hard-coded for the two-drive layout.
2921 # Required free disk space as a function of disk and swap space
2923 constants.DT_DISKLESS: None,
2924 constants.DT_PLAIN: disk_size + swap_size,
2925 # 256 MB are added for drbd metadata, 128MB for each drbd device
2926 constants.DT_DRBD8: disk_size + swap_size + 256,
2927 constants.DT_FILE: None,
2930 if disk_template not in req_size_dict:
2931 raise errors.ProgrammerError("Disk template '%s' size requirement"
2932 " is unknown" % disk_template)
2934 return req_size_dict[disk_template]
2937 class LUCreateInstance(LogicalUnit):
2938 """Create an instance.
2941 HPATH = "instance-add"
2942 HTYPE = constants.HTYPE_INSTANCE
2943 _OP_REQP = ["instance_name", "mem_size", "disk_size",
2944 "disk_template", "swap_size", "mode", "start", "vcpus",
2945 "wait_for_sync", "ip_check", "mac"]
2947 def _RunAllocator(self):
2948 """Run the allocator based on input opcode.
2951 disks = [{"size": self.op.disk_size, "mode": "w"},
2952 {"size": self.op.swap_size, "mode": "w"}]
2953 nics = [{"mac": self.op.mac, "ip": getattr(self.op, "ip", None),
2954 "bridge": self.op.bridge}]
2955 ial = IAllocator(self.cfg, self.sstore,
2956 mode=constants.IALLOCATOR_MODE_ALLOC,
2957 name=self.op.instance_name,
2958 disk_template=self.op.disk_template,
2961 vcpus=self.op.vcpus,
2962 mem_size=self.op.mem_size,
2967 ial.Run(self.op.iallocator)
2970 raise errors.OpPrereqError("Can't compute nodes using"
2971 " iallocator '%s': %s" % (self.op.iallocator,
2973 if len(ial.nodes) != ial.required_nodes:
2974 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
2975 " of nodes (%s), required %s" %
2976 (len(ial.nodes), ial.required_nodes))
2977 self.op.pnode = ial.nodes[0]
2978 logger.ToStdout("Selected nodes for the instance: %s" %
2979 (", ".join(ial.nodes),))
2980 logger.Info("Selected nodes for instance %s via iallocator %s: %s" %
2981 (self.op.instance_name, self.op.iallocator, ial.nodes))
2982 if ial.required_nodes == 2:
2983 self.op.snode = ial.nodes[1]
2985 def BuildHooksEnv(self):
2988 This runs on master, primary and secondary nodes of the instance.
2992 "INSTANCE_DISK_TEMPLATE": self.op.disk_template,
2993 "INSTANCE_DISK_SIZE": self.op.disk_size,
2994 "INSTANCE_SWAP_SIZE": self.op.swap_size,
2995 "INSTANCE_ADD_MODE": self.op.mode,
2997 if self.op.mode == constants.INSTANCE_IMPORT:
2998 env["INSTANCE_SRC_NODE"] = self.op.src_node
2999 env["INSTANCE_SRC_PATH"] = self.op.src_path
3000 env["INSTANCE_SRC_IMAGE"] = self.src_image
3002 env.update(_BuildInstanceHookEnv(name=self.op.instance_name,
3003 primary_node=self.op.pnode,
3004 secondary_nodes=self.secondaries,
3005 status=self.instance_status,
3006 os_type=self.op.os_type,
3007 memory=self.op.mem_size,
3008 vcpus=self.op.vcpus,
3009 nics=[(self.inst_ip, self.op.bridge, self.op.mac)],
3012 nl = ([self.sstore.GetMasterNode(), self.op.pnode] +
3017 def CheckPrereq(self):
3018 """Check prerequisites.
3021 # set optional parameters to none if they don't exist
3022 for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
3023 "iallocator", "hvm_acpi", "hvm_pae", "hvm_cdrom_image_path",
3024 "vnc_bind_address"]:
3025 if not hasattr(self.op, attr):
3026 setattr(self.op, attr, None)
3028 if self.op.mode not in (constants.INSTANCE_CREATE,
3029 constants.INSTANCE_IMPORT):
3030 raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
3033 if (not self.cfg.GetVGName() and
3034 self.op.disk_template not in constants.DTS_NOT_LVM):
3035 raise errors.OpPrereqError("Cluster does not support lvm-based"
3038 if self.op.mode == constants.INSTANCE_IMPORT:
3039 src_node = getattr(self.op, "src_node", None)
3040 src_path = getattr(self.op, "src_path", None)
3041 if src_node is None or src_path is None:
3042 raise errors.OpPrereqError("Importing an instance requires source"
3043 " node and path options")
3044 src_node_full = self.cfg.ExpandNodeName(src_node)
3045 if src_node_full is None:
3046 raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
3047 self.op.src_node = src_node = src_node_full
3049 if not os.path.isabs(src_path):
3050 raise errors.OpPrereqError("The source path must be absolute")
3052 export_info = rpc.call_export_info(src_node, src_path)
3055 raise errors.OpPrereqError("No export found in dir %s" % src_path)
3057 if not export_info.has_section(constants.INISECT_EXP):
3058 raise errors.ProgrammerError("Corrupted export config")
3060 ei_version = export_info.get(constants.INISECT_EXP, 'version')
3061 if (int(ei_version) != constants.EXPORT_VERSION):
3062 raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
3063 (ei_version, constants.EXPORT_VERSION))
3065 if int(export_info.get(constants.INISECT_INS, 'disk_count')) > 1:
3066 raise errors.OpPrereqError("Can't import instance with more than"
3069 # FIXME: are the old os-es, disk sizes, etc. useful?
3070 self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
3071 diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
3073 self.src_image = diskimage
3074 else: # INSTANCE_CREATE
3075 if getattr(self.op, "os_type", None) is None:
3076 raise errors.OpPrereqError("No guest OS specified")
3078 #### instance parameters check
3080 # disk template and mirror node verification
3081 if self.op.disk_template not in constants.DISK_TEMPLATES:
3082 raise errors.OpPrereqError("Invalid disk template name")
3084 # instance name verification
3085 hostname1 = utils.HostInfo(self.op.instance_name)
3087 self.op.instance_name = instance_name = hostname1.name
3088 instance_list = self.cfg.GetInstanceList()
3089 if instance_name in instance_list:
3090 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3093 # ip validity checks
3094 ip = getattr(self.op, "ip", None)
3095 if ip is None or ip.lower() == "none":
3097 elif ip.lower() == "auto":
3098 inst_ip = hostname1.ip
3100 if not utils.IsValidIP(ip):
3101 raise errors.OpPrereqError("given IP address '%s' doesn't look"
3102 " like a valid IP" % ip)
3104 self.inst_ip = self.op.ip = inst_ip
3106 if self.op.start and not self.op.ip_check:
3107 raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
3108 " adding an instance in start mode")
3110 if self.op.ip_check:
3111 if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
3112 raise errors.OpPrereqError("IP %s of instance %s already in use" %
3113 (hostname1.ip, instance_name))
3115 # MAC address verification
3116 if self.op.mac != "auto":
3117 if not utils.IsValidMac(self.op.mac.lower()):
3118 raise errors.OpPrereqError("invalid MAC address specified: %s" %
3121 # bridge verification
3122 bridge = getattr(self.op, "bridge", None)
3124 self.op.bridge = self.cfg.GetDefBridge()
3126 self.op.bridge = bridge
3128 # boot order verification
3129 if self.op.hvm_boot_order is not None:
3130 if len(self.op.hvm_boot_order.strip("acdn")) != 0:
3131 raise errors.OpPrereqError("invalid boot order specified,"
3132 " must be one or more of [acdn]")
3133 # file storage checks
3134 if (self.op.file_driver and
3135 not self.op.file_driver in constants.FILE_DRIVER):
3136 raise errors.OpPrereqError("Invalid file driver name '%s'" %
3137 self.op.file_driver)
3139 if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
3140 raise errors.OpPrereqError("File storage directory not a relative"
3144 if [self.op.iallocator, self.op.pnode].count(None) != 1:
3145 raise errors.OpPrereqError("One and only one of iallocator and primary"
3146 " node must be given")
3148 if self.op.iallocator is not None:
3149 self._RunAllocator()
3151 #### node related checks
3153 # check primary node
3154 pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
3156 raise errors.OpPrereqError("Primary node '%s' is unknown" %
3158 self.op.pnode = pnode.name
3160 self.secondaries = []
3162 # mirror node verification
3163 if self.op.disk_template in constants.DTS_NET_MIRROR:
3164 if getattr(self.op, "snode", None) is None:
3165 raise errors.OpPrereqError("The networked disk templates need"
3168 snode_name = self.cfg.ExpandNodeName(self.op.snode)
3169 if snode_name is None:
3170 raise errors.OpPrereqError("Unknown secondary node '%s'" %
3172 elif snode_name == pnode.name:
3173 raise errors.OpPrereqError("The secondary node cannot be"
3174 " the primary node.")
3175 self.secondaries.append(snode_name)
3177 req_size = _ComputeDiskSize(self.op.disk_template,
3178 self.op.disk_size, self.op.swap_size)
3180 # Check lv size requirements
3181 if req_size is not None:
3182 nodenames = [pnode.name] + self.secondaries
3183 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3184 for node in nodenames:
3185 info = nodeinfo.get(node, None)
3187 raise errors.OpPrereqError("Cannot get current information"
3188 " from node '%s'" % node)
3189 vg_free = info.get('vg_free', None)
3190 if not isinstance(vg_free, int):
3191 raise errors.OpPrereqError("Can't compute free disk space on"
3193 if req_size > info['vg_free']:
3194 raise errors.OpPrereqError("Not enough disk space on target node %s."
3195 " %d MB available, %d MB required" %
3196 (node, info['vg_free'], req_size))
3199 os_obj = rpc.call_os_get(pnode.name, self.op.os_type)
3201 raise errors.OpPrereqError("OS '%s' not in supported os list for"
3202 " primary node" % self.op.os_type)
3204 if self.op.kernel_path == constants.VALUE_NONE:
3205 raise errors.OpPrereqError("Can't set instance kernel to none")
3208 # bridge check on primary node
3209 if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
3210 raise errors.OpPrereqError("target bridge '%s' does not exist on"
3211 " destination node '%s'" %
3212 (self.op.bridge, pnode.name))
3214 # memory check on primary node
3216 _CheckNodeFreeMemory(self.cfg, self.pnode.name,
3217 "creating instance %s" % self.op.instance_name,
3220 # hvm_cdrom_image_path verification
3221 if self.op.hvm_cdrom_image_path is not None:
3222 if not os.path.isabs(self.op.hvm_cdrom_image_path):
3223 raise errors.OpPrereqError("The path to the HVM CDROM image must"
3224 " be an absolute path or None, not %s" %
3225 self.op.hvm_cdrom_image_path)
3226 if not os.path.isfile(self.op.hvm_cdrom_image_path):
3227 raise errors.OpPrereqError("The HVM CDROM image must either be a"
3228 " regular file or a symlink pointing to"
3229 " an existing regular file, not %s" %
3230 self.op.hvm_cdrom_image_path)
3232 # vnc_bind_address verification
3233 if self.op.vnc_bind_address is not None:
3234 if not utils.IsValidIP(self.op.vnc_bind_address):
3235 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
3236 " like a valid IP address" %
3237 self.op.vnc_bind_address)
3240 self.instance_status = 'up'
3242 self.instance_status = 'down'
3244 def Exec(self, feedback_fn):
3245 """Create and add the instance to the cluster.
3248 instance = self.op.instance_name
3249 pnode_name = self.pnode.name
3251 if self.op.mac == "auto":
3252 mac_address = self.cfg.GenerateMAC()
3254 mac_address = self.op.mac
3256 nic = objects.NIC(bridge=self.op.bridge, mac=mac_address)
3257 if self.inst_ip is not None:
3258 nic.ip = self.inst_ip
3260 ht_kind = self.sstore.GetHypervisorType()
3261 if ht_kind in constants.HTS_REQ_PORT:
3262 network_port = self.cfg.AllocatePort()
3266 if self.op.vnc_bind_address is None:
3267 self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
3269 # this is needed because os.path.join does not accept None arguments
3270 if self.op.file_storage_dir is None:
3271 string_file_storage_dir = ""
3273 string_file_storage_dir = self.op.file_storage_dir
3275 # build the full file storage dir path
3276 file_storage_dir = os.path.normpath(os.path.join(
3277 self.sstore.GetFileStorageDir(),
3278 string_file_storage_dir, instance))
3281 disks = _GenerateDiskTemplate(self.cfg,
3282 self.op.disk_template,
3283 instance, pnode_name,
3284 self.secondaries, self.op.disk_size,
3287 self.op.file_driver)
3289 iobj = objects.Instance(name=instance, os=self.op.os_type,
3290 primary_node=pnode_name,
3291 memory=self.op.mem_size,
3292 vcpus=self.op.vcpus,
3293 nics=[nic], disks=disks,
3294 disk_template=self.op.disk_template,
3295 status=self.instance_status,
3296 network_port=network_port,
3297 kernel_path=self.op.kernel_path,
3298 initrd_path=self.op.initrd_path,
3299 hvm_boot_order=self.op.hvm_boot_order,
3300 hvm_acpi=self.op.hvm_acpi,
3301 hvm_pae=self.op.hvm_pae,
3302 hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
3303 vnc_bind_address=self.op.vnc_bind_address,
3306 feedback_fn("* creating instance disks...")
3307 if not _CreateDisks(self.cfg, iobj):
3308 _RemoveDisks(iobj, self.cfg)
3309 raise errors.OpExecError("Device creation failed, reverting...")
3311 feedback_fn("adding instance %s to cluster config" % instance)
3313 self.cfg.AddInstance(iobj)
3314 # Add the new instance to the Ganeti Lock Manager
3315 self.context.glm.add(locking.LEVEL_INSTANCE, instance)
3317 if self.op.wait_for_sync:
3318 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
3319 elif iobj.disk_template in constants.DTS_NET_MIRROR:
3320 # make sure the disks are not degraded (still sync-ing is ok)
3322 feedback_fn("* checking mirrors status")
3323 disk_abort = not _WaitForSync(self.cfg, iobj, self.proc, oneshot=True)
3328 _RemoveDisks(iobj, self.cfg)
3329 self.cfg.RemoveInstance(iobj.name)
3330 # Remove the new instance from the Ganeti Lock Manager
3331 self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
3332 raise errors.OpExecError("There are some degraded disks for"
3335 feedback_fn("creating os for instance %s on node %s" %
3336 (instance, pnode_name))
3338 if iobj.disk_template != constants.DT_DISKLESS:
3339 if self.op.mode == constants.INSTANCE_CREATE:
3340 feedback_fn("* running the instance OS create scripts...")
3341 if not rpc.call_instance_os_add(pnode_name, iobj, "sda", "sdb"):
3342 raise errors.OpExecError("could not add os for instance %s"
3344 (instance, pnode_name))
3346 elif self.op.mode == constants.INSTANCE_IMPORT:
3347 feedback_fn("* running the instance OS import scripts...")
3348 src_node = self.op.src_node
3349 src_image = self.src_image
3350 if not rpc.call_instance_os_import(pnode_name, iobj, "sda", "sdb",
3351 src_node, src_image):
3352 raise errors.OpExecError("Could not import os for instance"
3354 (instance, pnode_name))
3356 # also checked in the prereq part
3357 raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
3361 logger.Info("starting instance %s on node %s" % (instance, pnode_name))
3362 feedback_fn("* starting instance...")
3363 if not rpc.call_instance_start(pnode_name, iobj, None):
3364 raise errors.OpExecError("Could not start instance")
3367 class LUConnectConsole(NoHooksLU):
3368 """Connect to an instance's console.
3370 This is somewhat special in that it returns the command line that
3371 you need to run on the master node in order to connect to the
3375 _OP_REQP = ["instance_name"]
3378 def ExpandNames(self):
3379 self._ExpandAndLockInstance()
3381 def CheckPrereq(self):
3382 """Check prerequisites.
3384 This checks that the instance is in the cluster.
3387 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3388 assert self.instance is not None, \
3389 "Cannot retrieve locked instance %s" % self.op.instance_name
3391 def Exec(self, feedback_fn):
3392 """Connect to the console of an instance
3395 instance = self.instance
3396 node = instance.primary_node
3398 node_insts = rpc.call_instance_list([node])[node]
3399 if node_insts is False:
3400 raise errors.OpExecError("Can't connect to node %s." % node)
3402 if instance.name not in node_insts:
3403 raise errors.OpExecError("Instance %s is not running." % instance.name)
3405 logger.Debug("connecting to console of %s on %s" % (instance.name, node))
3407 hyper = hypervisor.GetHypervisor()
3408 console_cmd = hyper.GetShellCommandForConsole(instance)
3411 return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
3414 class LUReplaceDisks(LogicalUnit):
3415 """Replace the disks of an instance.
3418 HPATH = "mirrors-replace"
3419 HTYPE = constants.HTYPE_INSTANCE
3420 _OP_REQP = ["instance_name", "mode", "disks"]
3422 def _RunAllocator(self):
3423 """Compute a new secondary node using an IAllocator.
3426 ial = IAllocator(self.cfg, self.sstore,
3427 mode=constants.IALLOCATOR_MODE_RELOC,
3428 name=self.op.instance_name,
3429 relocate_from=[self.sec_node])
3431 ial.Run(self.op.iallocator)
3434 raise errors.OpPrereqError("Can't compute nodes using"
3435 " iallocator '%s': %s" % (self.op.iallocator,
3437 if len(ial.nodes) != ial.required_nodes:
3438 raise errors.OpPrereqError("iallocator '%s' returned invalid number"
3439 " of nodes (%s), required %s" %
3440 (len(ial.nodes), ial.required_nodes))
3441 self.op.remote_node = ial.nodes[0]
3442 logger.ToStdout("Selected new secondary for the instance: %s" %
3443 self.op.remote_node)
3445 def BuildHooksEnv(self):
3448 This runs on the master, the primary and all the secondaries.
3452 "MODE": self.op.mode,
3453 "NEW_SECONDARY": self.op.remote_node,
3454 "OLD_SECONDARY": self.instance.secondary_nodes[0],
3456 env.update(_BuildInstanceHookEnvByObject(self.instance))
3458 self.sstore.GetMasterNode(),
3459 self.instance.primary_node,
3461 if self.op.remote_node is not None:
3462 nl.append(self.op.remote_node)
3465 def CheckPrereq(self):
3466 """Check prerequisites.
3468 This checks that the instance is in the cluster.
3471 if not hasattr(self.op, "remote_node"):
3472 self.op.remote_node = None
3474 instance = self.cfg.GetInstanceInfo(
3475 self.cfg.ExpandInstanceName(self.op.instance_name))
3476 if instance is None:
3477 raise errors.OpPrereqError("Instance '%s' not known" %
3478 self.op.instance_name)
3479 self.instance = instance
3480 self.op.instance_name = instance.name
3482 if instance.disk_template not in constants.DTS_NET_MIRROR:
3483 raise errors.OpPrereqError("Instance's disk layout is not"
3484 " network mirrored.")
3486 if len(instance.secondary_nodes) != 1:
3487 raise errors.OpPrereqError("The instance has a strange layout,"
3488 " expected one secondary but found %d" %
3489 len(instance.secondary_nodes))
3491 self.sec_node = instance.secondary_nodes[0]
3493 ia_name = getattr(self.op, "iallocator", None)
3494 if ia_name is not None:
3495 if self.op.remote_node is not None:
3496 raise errors.OpPrereqError("Give either the iallocator or the new"
3497 " secondary, not both")
3498 self.op.remote_node = self._RunAllocator()
3500 remote_node = self.op.remote_node
3501 if remote_node is not None:
3502 remote_node = self.cfg.ExpandNodeName(remote_node)
3503 if remote_node is None:
3504 raise errors.OpPrereqError("Node '%s' not known" %
3505 self.op.remote_node)
3506 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
3508 self.remote_node_info = None
3509 if remote_node == instance.primary_node:
3510 raise errors.OpPrereqError("The specified node is the primary node of"
3512 elif remote_node == self.sec_node:
3513 if self.op.mode == constants.REPLACE_DISK_SEC:
3514 # this is for DRBD8, where we can't execute the same mode of
3515 # replacement as for drbd7 (no different port allocated)
3516 raise errors.OpPrereqError("Same secondary given, cannot execute"
3518 if instance.disk_template == constants.DT_DRBD8:
3519 if (self.op.mode == constants.REPLACE_DISK_ALL and
3520 remote_node is not None):
3521 # switch to replace secondary mode
3522 self.op.mode = constants.REPLACE_DISK_SEC
3524 if self.op.mode == constants.REPLACE_DISK_ALL:
3525 raise errors.OpPrereqError("Template 'drbd' only allows primary or"
3526 " secondary disk replacement, not"
3528 elif self.op.mode == constants.REPLACE_DISK_PRI:
3529 if remote_node is not None:
3530 raise errors.OpPrereqError("Template 'drbd' does not allow changing"
3531 " the secondary while doing a primary"
3532 " node disk replacement")
3533 self.tgt_node = instance.primary_node
3534 self.oth_node = instance.secondary_nodes[0]
3535 elif self.op.mode == constants.REPLACE_DISK_SEC:
3536 self.new_node = remote_node # this can be None, in which case
3537 # we don't change the secondary
3538 self.tgt_node = instance.secondary_nodes[0]
3539 self.oth_node = instance.primary_node
3541 raise errors.ProgrammerError("Unhandled disk replace mode")
3543 for name in self.op.disks:
3544 if instance.FindDisk(name) is None:
3545 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3546 (name, instance.name))
3547 self.op.remote_node = remote_node
3549 def _ExecD8DiskOnly(self, feedback_fn):
3550 """Replace a disk on the primary or secondary for dbrd8.
3552 The algorithm for replace is quite complicated:
3553 - for each disk to be replaced:
3554 - create new LVs on the target node with unique names
3555 - detach old LVs from the drbd device
3556 - rename old LVs to name_replaced.<time_t>
3557 - rename new LVs to old LVs
3558 - attach the new LVs (with the old names now) to the drbd device
3559 - wait for sync across all devices
3560 - for each modified disk:
3561 - remove old LVs (which have the name name_replaces.<time_t>)
3563 Failures are not very well handled.
3567 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3568 instance = self.instance
3570 vgname = self.cfg.GetVGName()
3573 tgt_node = self.tgt_node
3574 oth_node = self.oth_node
3576 # Step: check device activation
3577 self.proc.LogStep(1, steps_total, "check device existence")
3578 info("checking volume groups")
3579 my_vg = cfg.GetVGName()
3580 results = rpc.call_vg_list([oth_node, tgt_node])
3582 raise errors.OpExecError("Can't list volume groups on the nodes")
3583 for node in oth_node, tgt_node:
3584 res = results.get(node, False)
3585 if not res or my_vg not in res:
3586 raise errors.OpExecError("Volume group '%s' not found on %s" %
3588 for dev in instance.disks:
3589 if not dev.iv_name in self.op.disks:
3591 for node in tgt_node, oth_node:
3592 info("checking %s on %s" % (dev.iv_name, node))
3593 cfg.SetDiskID(dev, node)
3594 if not rpc.call_blockdev_find(node, dev):
3595 raise errors.OpExecError("Can't find device %s on node %s" %
3596 (dev.iv_name, node))
3598 # Step: check other node consistency
3599 self.proc.LogStep(2, steps_total, "check peer consistency")
3600 for dev in instance.disks:
3601 if not dev.iv_name in self.op.disks:
3603 info("checking %s consistency on %s" % (dev.iv_name, oth_node))
3604 if not _CheckDiskConsistency(self.cfg, dev, oth_node,
3605 oth_node==instance.primary_node):
3606 raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
3607 " to replace disks on this node (%s)" %
3608 (oth_node, tgt_node))
3610 # Step: create new storage
3611 self.proc.LogStep(3, steps_total, "allocate new storage")
3612 for dev in instance.disks:
3613 if not dev.iv_name in self.op.disks:
3616 cfg.SetDiskID(dev, tgt_node)
3617 lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
3618 names = _GenerateUniqueNames(cfg, lv_names)
3619 lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
3620 logical_id=(vgname, names[0]))
3621 lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
3622 logical_id=(vgname, names[1]))
3623 new_lvs = [lv_data, lv_meta]
3624 old_lvs = dev.children
3625 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
3626 info("creating new local storage on %s for %s" %
3627 (tgt_node, dev.iv_name))
3628 # since we *always* want to create this LV, we use the
3629 # _Create...OnPrimary (which forces the creation), even if we
3630 # are talking about the secondary node
3631 for new_lv in new_lvs:
3632 if not _CreateBlockDevOnPrimary(cfg, tgt_node, instance, new_lv,
3633 _GetInstanceInfoText(instance)):
3634 raise errors.OpExecError("Failed to create new LV named '%s' on"
3636 (new_lv.logical_id[1], tgt_node))
3638 # Step: for each lv, detach+rename*2+attach
3639 self.proc.LogStep(4, steps_total, "change drbd configuration")
3640 for dev, old_lvs, new_lvs in iv_names.itervalues():
3641 info("detaching %s drbd from local storage" % dev.iv_name)
3642 if not rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs):
3643 raise errors.OpExecError("Can't detach drbd from local storage on node"
3644 " %s for device %s" % (tgt_node, dev.iv_name))
3646 #cfg.Update(instance)
3648 # ok, we created the new LVs, so now we know we have the needed
3649 # storage; as such, we proceed on the target node to rename
3650 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
3651 # using the assumption that logical_id == physical_id (which in
3652 # turn is the unique_id on that node)
3654 # FIXME(iustin): use a better name for the replaced LVs
3655 temp_suffix = int(time.time())
3656 ren_fn = lambda d, suff: (d.physical_id[0],
3657 d.physical_id[1] + "_replaced-%s" % suff)
3658 # build the rename list based on what LVs exist on the node
3660 for to_ren in old_lvs:
3661 find_res = rpc.call_blockdev_find(tgt_node, to_ren)
3662 if find_res is not None: # device exists
3663 rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
3665 info("renaming the old LVs on the target node")
3666 if not rpc.call_blockdev_rename(tgt_node, rlist):
3667 raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
3668 # now we rename the new LVs to the old LVs
3669 info("renaming the new LVs on the target node")
3670 rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
3671 if not rpc.call_blockdev_rename(tgt_node, rlist):
3672 raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
3674 for old, new in zip(old_lvs, new_lvs):
3675 new.logical_id = old.logical_id
3676 cfg.SetDiskID(new, tgt_node)
3678 for disk in old_lvs:
3679 disk.logical_id = ren_fn(disk, temp_suffix)
3680 cfg.SetDiskID(disk, tgt_node)
3682 # now that the new lvs have the old name, we can add them to the device
3683 info("adding new mirror component on %s" % tgt_node)
3684 if not rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs):
3685 for new_lv in new_lvs:
3686 if not rpc.call_blockdev_remove(tgt_node, new_lv):
3687 warning("Can't rollback device %s", hint="manually cleanup unused"
3689 raise errors.OpExecError("Can't add local storage to drbd")
3691 dev.children = new_lvs
3692 cfg.Update(instance)
3694 # Step: wait for sync
3696 # this can fail as the old devices are degraded and _WaitForSync
3697 # does a combined result over all disks, so we don't check its
3699 self.proc.LogStep(5, steps_total, "sync devices")
3700 _WaitForSync(cfg, instance, self.proc, unlock=True)
3702 # so check manually all the devices
3703 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3704 cfg.SetDiskID(dev, instance.primary_node)
3705 is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
3707 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3709 # Step: remove old storage
3710 self.proc.LogStep(6, steps_total, "removing old storage")
3711 for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
3712 info("remove logical volumes for %s" % name)
3714 cfg.SetDiskID(lv, tgt_node)
3715 if not rpc.call_blockdev_remove(tgt_node, lv):
3716 warning("Can't remove old LV", hint="manually remove unused LVs")
3719 def _ExecD8Secondary(self, feedback_fn):
3720 """Replace the secondary node for drbd8.
3722 The algorithm for replace is quite complicated:
3723 - for all disks of the instance:
3724 - create new LVs on the new node with same names
3725 - shutdown the drbd device on the old secondary
3726 - disconnect the drbd network on the primary
3727 - create the drbd device on the new secondary
3728 - network attach the drbd on the primary, using an artifice:
3729 the drbd code for Attach() will connect to the network if it
3730 finds a device which is connected to the good local disks but
3732 - wait for sync across all devices
3733 - remove all disks from the old secondary
3735 Failures are not very well handled.
3739 warning, info = (self.proc.LogWarning, self.proc.LogInfo)
3740 instance = self.instance
3742 vgname = self.cfg.GetVGName()
3745 old_node = self.tgt_node
3746 new_node = self.new_node
3747 pri_node = instance.primary_node
3749 # Step: check device activation
3750 self.proc.LogStep(1, steps_total, "check device existence")
3751 info("checking volume groups")
3752 my_vg = cfg.GetVGName()
3753 results = rpc.call_vg_list([pri_node, new_node])
3755 raise errors.OpExecError("Can't list volume groups on the nodes")
3756 for node in pri_node, new_node:
3757 res = results.get(node, False)
3758 if not res or my_vg not in res:
3759 raise errors.OpExecError("Volume group '%s' not found on %s" %
3761 for dev in instance.disks:
3762 if not dev.iv_name in self.op.disks:
3764 info("checking %s on %s" % (dev.iv_name, pri_node))
3765 cfg.SetDiskID(dev, pri_node)
3766 if not rpc.call_blockdev_find(pri_node, dev):
3767 raise errors.OpExecError("Can't find device %s on node %s" %
3768 (dev.iv_name, pri_node))
3770 # Step: check other node consistency
3771 self.proc.LogStep(2, steps_total, "check peer consistency")
3772 for dev in instance.disks:
3773 if not dev.iv_name in self.op.disks:
3775 info("checking %s consistency on %s" % (dev.iv_name, pri_node))
3776 if not _CheckDiskConsistency(self.cfg, dev, pri_node, True, ldisk=True):
3777 raise errors.OpExecError("Primary node (%s) has degraded storage,"
3778 " unsafe to replace the secondary" %
3781 # Step: create new storage
3782 self.proc.LogStep(3, steps_total, "allocate new storage")
3783 for dev in instance.disks:
3785 info("adding new local storage on %s for %s" % (new_node, dev.iv_name))
3786 # since we *always* want to create this LV, we use the
3787 # _Create...OnPrimary (which forces the creation), even if we
3788 # are talking about the secondary node
3789 for new_lv in dev.children:
3790 if not _CreateBlockDevOnPrimary(cfg, new_node, instance, new_lv,
3791 _GetInstanceInfoText(instance)):
3792 raise errors.OpExecError("Failed to create new LV named '%s' on"
3794 (new_lv.logical_id[1], new_node))
3796 iv_names[dev.iv_name] = (dev, dev.children)
3798 self.proc.LogStep(4, steps_total, "changing drbd configuration")
3799 for dev in instance.disks:
3801 info("activating a new drbd on %s for %s" % (new_node, dev.iv_name))
3802 # create new devices on new_node
3803 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
3804 logical_id=(pri_node, new_node,
3806 children=dev.children)
3807 if not _CreateBlockDevOnSecondary(cfg, new_node, instance,
3809 _GetInstanceInfoText(instance)):
3810 raise errors.OpExecError("Failed to create new DRBD on"
3811 " node '%s'" % new_node)
3813 for dev in instance.disks:
3814 # we have new devices, shutdown the drbd on the old secondary
3815 info("shutting down drbd for %s on old node" % dev.iv_name)
3816 cfg.SetDiskID(dev, old_node)
3817 if not rpc.call_blockdev_shutdown(old_node, dev):
3818 warning("Failed to shutdown drbd for %s on old node" % dev.iv_name,
3819 hint="Please cleanup this device manually as soon as possible")
3821 info("detaching primary drbds from the network (=> standalone)")
3823 for dev in instance.disks:
3824 cfg.SetDiskID(dev, pri_node)
3825 # set the physical (unique in bdev terms) id to None, meaning
3826 # detach from network
3827 dev.physical_id = (None,) * len(dev.physical_id)
3828 # and 'find' the device, which will 'fix' it to match the
3830 if rpc.call_blockdev_find(pri_node, dev):
3833 warning("Failed to detach drbd %s from network, unusual case" %
3837 # no detaches succeeded (very unlikely)
3838 raise errors.OpExecError("Can't detach at least one DRBD from old node")
3840 # if we managed to detach at least one, we update all the disks of
3841 # the instance to point to the new secondary
3842 info("updating instance configuration")
3843 for dev in instance.disks:
3844 dev.logical_id = (pri_node, new_node) + dev.logical_id[2:]
3845 cfg.SetDiskID(dev, pri_node)
3846 cfg.Update(instance)
3848 # and now perform the drbd attach
3849 info("attaching primary drbds to new secondary (standalone => connected)")
3851 for dev in instance.disks:
3852 info("attaching primary drbd for %s to new secondary node" % dev.iv_name)
3853 # since the attach is smart, it's enough to 'find' the device,
3854 # it will automatically activate the network, if the physical_id
3856 cfg.SetDiskID(dev, pri_node)
3857 if not rpc.call_blockdev_find(pri_node, dev):
3858 warning("can't attach drbd %s to new secondary!" % dev.iv_name,
3859 "please do a gnt-instance info to see the status of disks")
3861 # this can fail as the old devices are degraded and _WaitForSync
3862 # does a combined result over all disks, so we don't check its
3864 self.proc.LogStep(5, steps_total, "sync devices")
3865 _WaitForSync(cfg, instance, self.proc, unlock=True)
3867 # so check manually all the devices
3868 for name, (dev, old_lvs) in iv_names.iteritems():
3869 cfg.SetDiskID(dev, pri_node)
3870 is_degr = rpc.call_blockdev_find(pri_node, dev)[5]
3872 raise errors.OpExecError("DRBD device %s is degraded!" % name)
3874 self.proc.LogStep(6, steps_total, "removing old storage")
3875 for name, (dev, old_lvs) in iv_names.iteritems():
3876 info("remove logical volumes for %s" % name)
3878 cfg.SetDiskID(lv, old_node)
3879 if not rpc.call_blockdev_remove(old_node, lv):
3880 warning("Can't remove LV on old secondary",
3881 hint="Cleanup stale volumes by hand")
3883 def Exec(self, feedback_fn):
3884 """Execute disk replacement.
3886 This dispatches the disk replacement to the appropriate handler.
3889 instance = self.instance
3891 # Activate the instance disks if we're replacing them on a down instance
3892 if instance.status == "down":
3893 op = opcodes.OpActivateInstanceDisks(instance_name=instance.name)
3894 self.proc.ChainOpCode(op)
3896 if instance.disk_template == constants.DT_DRBD8:
3897 if self.op.remote_node is None:
3898 fn = self._ExecD8DiskOnly
3900 fn = self._ExecD8Secondary
3902 raise errors.ProgrammerError("Unhandled disk replacement case")
3904 ret = fn(feedback_fn)
3906 # Deactivate the instance disks if we're replacing them on a down instance
3907 if instance.status == "down":
3908 op = opcodes.OpDeactivateInstanceDisks(instance_name=instance.name)
3909 self.proc.ChainOpCode(op)
3914 class LUGrowDisk(LogicalUnit):
3915 """Grow a disk of an instance.
3919 HTYPE = constants.HTYPE_INSTANCE
3920 _OP_REQP = ["instance_name", "disk", "amount"]
3922 def BuildHooksEnv(self):
3925 This runs on the master, the primary and all the secondaries.
3929 "DISK": self.op.disk,
3930 "AMOUNT": self.op.amount,
3932 env.update(_BuildInstanceHookEnvByObject(self.instance))
3934 self.sstore.GetMasterNode(),
3935 self.instance.primary_node,
3939 def CheckPrereq(self):
3940 """Check prerequisites.
3942 This checks that the instance is in the cluster.
3945 instance = self.cfg.GetInstanceInfo(
3946 self.cfg.ExpandInstanceName(self.op.instance_name))
3947 if instance is None:
3948 raise errors.OpPrereqError("Instance '%s' not known" %
3949 self.op.instance_name)
3950 self.instance = instance
3951 self.op.instance_name = instance.name
3953 if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
3954 raise errors.OpPrereqError("Instance's disk layout does not support"
3957 if instance.FindDisk(self.op.disk) is None:
3958 raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
3959 (self.op.disk, instance.name))
3961 nodenames = [instance.primary_node] + list(instance.secondary_nodes)
3962 nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
3963 for node in nodenames:
3964 info = nodeinfo.get(node, None)
3966 raise errors.OpPrereqError("Cannot get current information"
3967 " from node '%s'" % node)
3968 vg_free = info.get('vg_free', None)
3969 if not isinstance(vg_free, int):
3970 raise errors.OpPrereqError("Can't compute free disk space on"
3972 if self.op.amount > info['vg_free']:
3973 raise errors.OpPrereqError("Not enough disk space on target node %s:"
3974 " %d MiB available, %d MiB required" %
3975 (node, info['vg_free'], self.op.amount))
3977 def Exec(self, feedback_fn):
3978 """Execute disk grow.
3981 instance = self.instance
3982 disk = instance.FindDisk(self.op.disk)
3983 for node in (instance.secondary_nodes + (instance.primary_node,)):
3984 self.cfg.SetDiskID(disk, node)
3985 result = rpc.call_blockdev_grow(node, disk, self.op.amount)
3986 if not result or not isinstance(result, tuple) or len(result) != 2:
3987 raise errors.OpExecError("grow request failed to node %s" % node)
3989 raise errors.OpExecError("grow request failed to node %s: %s" %
3991 disk.RecordGrow(self.op.amount)
3992 self.cfg.Update(instance)
3996 class LUQueryInstanceData(NoHooksLU):
3997 """Query runtime instance data.
4000 _OP_REQP = ["instances"]
4002 def CheckPrereq(self):
4003 """Check prerequisites.
4005 This only checks the optional instance list against the existing names.
4008 if not isinstance(self.op.instances, list):
4009 raise errors.OpPrereqError("Invalid argument type 'instances'")
4010 if self.op.instances:
4011 self.wanted_instances = []
4012 names = self.op.instances
4014 instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
4015 if instance is None:
4016 raise errors.OpPrereqError("No such instance name '%s'" % name)
4017 self.wanted_instances.append(instance)
4019 self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
4020 in self.cfg.GetInstanceList()]
4024 def _ComputeDiskStatus(self, instance, snode, dev):
4025 """Compute block device status.
4028 self.cfg.SetDiskID(dev, instance.primary_node)
4029 dev_pstatus = rpc.call_blockdev_find(instance.primary_node, dev)
4030 if dev.dev_type in constants.LDS_DRBD:
4031 # we change the snode then (otherwise we use the one passed in)
4032 if dev.logical_id[0] == instance.primary_node:
4033 snode = dev.logical_id[1]
4035 snode = dev.logical_id[0]
4038 self.cfg.SetDiskID(dev, snode)
4039 dev_sstatus = rpc.call_blockdev_find(snode, dev)
4044 dev_children = [self._ComputeDiskStatus(instance, snode, child)
4045 for child in dev.children]
4050 "iv_name": dev.iv_name,
4051 "dev_type": dev.dev_type,
4052 "logical_id": dev.logical_id,
4053 "physical_id": dev.physical_id,
4054 "pstatus": dev_pstatus,
4055 "sstatus": dev_sstatus,
4056 "children": dev_children,
4061 def Exec(self, feedback_fn):
4062 """Gather and return data"""
4064 for instance in self.wanted_instances:
4065 remote_info = rpc.call_instance_info(instance.primary_node,
4067 if remote_info and "state" in remote_info:
4070 remote_state = "down"
4071 if instance.status == "down":
4072 config_state = "down"
4076 disks = [self._ComputeDiskStatus(instance, None, device)
4077 for device in instance.disks]
4080 "name": instance.name,
4081 "config_state": config_state,
4082 "run_state": remote_state,
4083 "pnode": instance.primary_node,
4084 "snodes": instance.secondary_nodes,
4086 "memory": instance.memory,
4087 "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
4089 "vcpus": instance.vcpus,
4092 htkind = self.sstore.GetHypervisorType()
4093 if htkind == constants.HT_XEN_PVM30:
4094 idict["kernel_path"] = instance.kernel_path
4095 idict["initrd_path"] = instance.initrd_path
4097 if htkind == constants.HT_XEN_HVM31:
4098 idict["hvm_boot_order"] = instance.hvm_boot_order
4099 idict["hvm_acpi"] = instance.hvm_acpi
4100 idict["hvm_pae"] = instance.hvm_pae
4101 idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
4103 if htkind in constants.HTS_REQ_PORT:
4104 idict["vnc_bind_address"] = instance.vnc_bind_address
4105 idict["network_port"] = instance.network_port
4107 result[instance.name] = idict
4112 class LUSetInstanceParams(LogicalUnit):
4113 """Modifies an instances's parameters.
4116 HPATH = "instance-modify"
4117 HTYPE = constants.HTYPE_INSTANCE
4118 _OP_REQP = ["instance_name"]
4121 def ExpandNames(self):
4122 self._ExpandAndLockInstance()
4124 def BuildHooksEnv(self):
4127 This runs on the master, primary and secondaries.
4132 args['memory'] = self.mem
4134 args['vcpus'] = self.vcpus
4135 if self.do_ip or self.do_bridge or self.mac:
4139 ip = self.instance.nics[0].ip
4141 bridge = self.bridge
4143 bridge = self.instance.nics[0].bridge
4147 mac = self.instance.nics[0].mac
4148 args['nics'] = [(ip, bridge, mac)]
4149 env = _BuildInstanceHookEnvByObject(self.instance, override=args)
4150 nl = [self.sstore.GetMasterNode(),
4151 self.instance.primary_node] + list(self.instance.secondary_nodes)
4154 def CheckPrereq(self):
4155 """Check prerequisites.
4157 This only checks the instance list against the existing names.
4160 # FIXME: all the parameters could be checked before, in ExpandNames, or in
4161 # a separate CheckArguments function, if we implement one, so the operation
4162 # can be aborted without waiting for any lock, should it have an error...
4163 self.mem = getattr(self.op, "mem", None)
4164 self.vcpus = getattr(self.op, "vcpus", None)
4165 self.ip = getattr(self.op, "ip", None)
4166 self.mac = getattr(self.op, "mac", None)
4167 self.bridge = getattr(self.op, "bridge", None)
4168 self.kernel_path = getattr(self.op, "kernel_path", None)
4169 self.initrd_path = getattr(self.op, "initrd_path", None)
4170 self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
4171 self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
4172 self.hvm_pae = getattr(self.op, "hvm_pae", None)
4173 self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
4174 self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
4175 all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
4176 self.kernel_path, self.initrd_path, self.hvm_boot_order,
4177 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
4178 self.vnc_bind_address]
4179 if all_parms.count(None) == len(all_parms):
4180 raise errors.OpPrereqError("No changes submitted")
4181 if self.mem is not None:
4183 self.mem = int(self.mem)
4184 except ValueError, err:
4185 raise errors.OpPrereqError("Invalid memory size: %s" % str(err))
4186 if self.vcpus is not None:
4188 self.vcpus = int(self.vcpus)
4189 except ValueError, err:
4190 raise errors.OpPrereqError("Invalid vcpus number: %s" % str(err))
4191 if self.ip is not None:
4193 if self.ip.lower() == "none":
4196 if not utils.IsValidIP(self.ip):
4197 raise errors.OpPrereqError("Invalid IP address '%s'." % self.ip)
4200 self.do_bridge = (self.bridge is not None)
4201 if self.mac is not None:
4202 if self.cfg.IsMacInUse(self.mac):
4203 raise errors.OpPrereqError('MAC address %s already in use in cluster' %
4205 if not utils.IsValidMac(self.mac):
4206 raise errors.OpPrereqError('Invalid MAC address %s' % self.mac)
4208 if self.kernel_path is not None:
4209 self.do_kernel_path = True
4210 if self.kernel_path == constants.VALUE_NONE:
4211 raise errors.OpPrereqError("Can't set instance to no kernel")
4213 if self.kernel_path != constants.VALUE_DEFAULT:
4214 if not os.path.isabs(self.kernel_path):
4215 raise errors.OpPrereqError("The kernel path must be an absolute"
4218 self.do_kernel_path = False
4220 if self.initrd_path is not None:
4221 self.do_initrd_path = True
4222 if self.initrd_path not in (constants.VALUE_NONE,
4223 constants.VALUE_DEFAULT):
4224 if not os.path.isabs(self.initrd_path):
4225 raise errors.OpPrereqError("The initrd path must be an absolute"
4228 self.do_initrd_path = False
4230 # boot order verification
4231 if self.hvm_boot_order is not None:
4232 if self.hvm_boot_order != constants.VALUE_DEFAULT:
4233 if len(self.hvm_boot_order.strip("acdn")) != 0:
4234 raise errors.OpPrereqError("invalid boot order specified,"
4235 " must be one or more of [acdn]"
4238 # hvm_cdrom_image_path verification
4239 if self.op.hvm_cdrom_image_path is not None:
4240 if not os.path.isabs(self.op.hvm_cdrom_image_path):
4241 raise errors.OpPrereqError("The path to the HVM CDROM image must"
4242 " be an absolute path or None, not %s" %
4243 self.op.hvm_cdrom_image_path)
4244 if not os.path.isfile(self.op.hvm_cdrom_image_path):
4245 raise errors.OpPrereqError("The HVM CDROM image must either be a"
4246 " regular file or a symlink pointing to"
4247 " an existing regular file, not %s" %
4248 self.op.hvm_cdrom_image_path)
4250 # vnc_bind_address verification
4251 if self.op.vnc_bind_address is not None:
4252 if not utils.IsValidIP(self.op.vnc_bind_address):
4253 raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
4254 " like a valid IP address" %
4255 self.op.vnc_bind_address)
4257 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4258 assert self.instance is not None, \
4259 "Cannot retrieve locked instance %s" % self.op.instance_name
4262 def Exec(self, feedback_fn):
4263 """Modifies an instance.
4265 All parameters take effect only at the next restart of the instance.
4268 instance = self.instance
4270 instance.memory = self.mem
4271 result.append(("mem", self.mem))
4273 instance.vcpus = self.vcpus
4274 result.append(("vcpus", self.vcpus))
4276 instance.nics[0].ip = self.ip
4277 result.append(("ip", self.ip))
4279 instance.nics[0].bridge = self.bridge
4280 result.append(("bridge", self.bridge))
4282 instance.nics[0].mac = self.mac
4283 result.append(("mac", self.mac))
4284 if self.do_kernel_path:
4285 instance.kernel_path = self.kernel_path
4286 result.append(("kernel_path", self.kernel_path))
4287 if self.do_initrd_path:
4288 instance.initrd_path = self.initrd_path
4289 result.append(("initrd_path", self.initrd_path))
4290 if self.hvm_boot_order:
4291 if self.hvm_boot_order == constants.VALUE_DEFAULT:
4292 instance.hvm_boot_order = None
4294 instance.hvm_boot_order = self.hvm_boot_order
4295 result.append(("hvm_boot_order", self.hvm_boot_order))
4297 instance.hvm_acpi = self.hvm_acpi
4298 result.append(("hvm_acpi", self.hvm_acpi))
4300 instance.hvm_pae = self.hvm_pae
4301 result.append(("hvm_pae", self.hvm_pae))
4302 if self.hvm_cdrom_image_path:
4303 instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
4304 result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
4305 if self.vnc_bind_address:
4306 instance.vnc_bind_address = self.vnc_bind_address
4307 result.append(("vnc_bind_address", self.vnc_bind_address))
4309 self.cfg.Update(instance)
4314 class LUQueryExports(NoHooksLU):
4315 """Query the exports list
4320 def CheckPrereq(self):
4321 """Check that the nodelist contains only existing nodes.
4324 self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
4326 def Exec(self, feedback_fn):
4327 """Compute the list of all the exported system images.
4330 a dictionary with the structure node->(export-list)
4331 where export-list is a list of the instances exported on
4335 return rpc.call_export_list(self.nodes)
4338 class LUExportInstance(LogicalUnit):
4339 """Export an instance to an image in the cluster.
4342 HPATH = "instance-export"
4343 HTYPE = constants.HTYPE_INSTANCE
4344 _OP_REQP = ["instance_name", "target_node", "shutdown"]
4346 def BuildHooksEnv(self):
4349 This will run on the master, primary node and target node.
4353 "EXPORT_NODE": self.op.target_node,
4354 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
4356 env.update(_BuildInstanceHookEnvByObject(self.instance))
4357 nl = [self.sstore.GetMasterNode(), self.instance.primary_node,
4358 self.op.target_node]
4361 def CheckPrereq(self):
4362 """Check prerequisites.
4364 This checks that the instance and node names are valid.
4367 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4368 self.instance = self.cfg.GetInstanceInfo(instance_name)
4369 if self.instance is None:
4370 raise errors.OpPrereqError("Instance '%s' not found" %
4371 self.op.instance_name)
4374 dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
4375 self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
4377 if self.dst_node is None:
4378 raise errors.OpPrereqError("Destination node '%s' is unknown." %
4379 self.op.target_node)
4380 self.op.target_node = self.dst_node.name
4382 # instance disk type verification
4383 for disk in self.instance.disks:
4384 if disk.dev_type == constants.LD_FILE:
4385 raise errors.OpPrereqError("Export not supported for instances with"
4386 " file-based disks")
4388 def Exec(self, feedback_fn):
4389 """Export an instance to an image in the cluster.
4392 instance = self.instance
4393 dst_node = self.dst_node
4394 src_node = instance.primary_node
4395 if self.op.shutdown:
4396 # shutdown the instance, but not the disks
4397 if not rpc.call_instance_shutdown(src_node, instance):
4398 raise errors.OpExecError("Could not shutdown instance %s on node %s" %
4399 (instance.name, src_node))
4401 vgname = self.cfg.GetVGName()
4406 for disk in instance.disks:
4407 if disk.iv_name == "sda":
4408 # new_dev_name will be a snapshot of an lvm leaf of the one we passed
4409 new_dev_name = rpc.call_blockdev_snapshot(src_node, disk)
4411 if not new_dev_name:
4412 logger.Error("could not snapshot block device %s on node %s" %
4413 (disk.logical_id[1], src_node))
4415 new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
4416 logical_id=(vgname, new_dev_name),
4417 physical_id=(vgname, new_dev_name),
4418 iv_name=disk.iv_name)
4419 snap_disks.append(new_dev)
4422 if self.op.shutdown and instance.status == "up":
4423 if not rpc.call_instance_start(src_node, instance, None):
4424 _ShutdownInstanceDisks(instance, self.cfg)
4425 raise errors.OpExecError("Could not start instance")
4427 # TODO: check for size
4429 for dev in snap_disks:
4430 if not rpc.call_snapshot_export(src_node, dev, dst_node.name, instance):
4431 logger.Error("could not export block device %s from node %s to node %s"
4432 % (dev.logical_id[1], src_node, dst_node.name))
4433 if not rpc.call_blockdev_remove(src_node, dev):
4434 logger.Error("could not remove snapshot block device %s from node %s" %
4435 (dev.logical_id[1], src_node))
4437 if not rpc.call_finalize_export(dst_node.name, instance, snap_disks):
4438 logger.Error("could not finalize export for instance %s on node %s" %
4439 (instance.name, dst_node.name))
4441 nodelist = self.cfg.GetNodeList()
4442 nodelist.remove(dst_node.name)
4444 # on one-node clusters nodelist will be empty after the removal
4445 # if we proceed the backup would be removed because OpQueryExports
4446 # substitutes an empty list with the full cluster node list.
4448 op = opcodes.OpQueryExports(nodes=nodelist)
4449 exportlist = self.proc.ChainOpCode(op)
4450 for node in exportlist:
4451 if instance.name in exportlist[node]:
4452 if not rpc.call_export_remove(node, instance.name):
4453 logger.Error("could not remove older export for instance %s"
4454 " on node %s" % (instance.name, node))
4457 class LURemoveExport(NoHooksLU):
4458 """Remove exports related to the named instance.
4461 _OP_REQP = ["instance_name"]
4463 def CheckPrereq(self):
4464 """Check prerequisites.
4468 def Exec(self, feedback_fn):
4469 """Remove any export.
4472 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
4473 # If the instance was not found we'll try with the name that was passed in.
4474 # This will only work if it was an FQDN, though.
4476 if not instance_name:
4478 instance_name = self.op.instance_name
4480 op = opcodes.OpQueryExports(nodes=[])
4481 exportlist = self.proc.ChainOpCode(op)
4483 for node in exportlist:
4484 if instance_name in exportlist[node]:
4486 if not rpc.call_export_remove(node, instance_name):
4487 logger.Error("could not remove export for instance %s"
4488 " on node %s" % (instance_name, node))
4490 if fqdn_warn and not found:
4491 feedback_fn("Export not found. If trying to remove an export belonging"
4492 " to a deleted instance please use its Fully Qualified"
4496 class TagsLU(NoHooksLU):
4499 This is an abstract class which is the parent of all the other tags LUs.
4502 def CheckPrereq(self):
4503 """Check prerequisites.
4506 if self.op.kind == constants.TAG_CLUSTER:
4507 self.target = self.cfg.GetClusterInfo()
4508 elif self.op.kind == constants.TAG_NODE:
4509 name = self.cfg.ExpandNodeName(self.op.name)
4511 raise errors.OpPrereqError("Invalid node name (%s)" %
4514 self.target = self.cfg.GetNodeInfo(name)
4515 elif self.op.kind == constants.TAG_INSTANCE:
4516 name = self.cfg.ExpandInstanceName(self.op.name)
4518 raise errors.OpPrereqError("Invalid instance name (%s)" %
4521 self.target = self.cfg.GetInstanceInfo(name)
4523 raise errors.OpPrereqError("Wrong tag type requested (%s)" %
4527 class LUGetTags(TagsLU):
4528 """Returns the tags of a given object.
4531 _OP_REQP = ["kind", "name"]
4533 def Exec(self, feedback_fn):
4534 """Returns the tag list.
4537 return list(self.target.GetTags())
4540 class LUSearchTags(NoHooksLU):
4541 """Searches the tags for a given pattern.
4544 _OP_REQP = ["pattern"]
4546 def CheckPrereq(self):
4547 """Check prerequisites.
4549 This checks the pattern passed for validity by compiling it.
4553 self.re = re.compile(self.op.pattern)
4554 except re.error, err:
4555 raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
4556 (self.op.pattern, err))
4558 def Exec(self, feedback_fn):
4559 """Returns the tag list.
4563 tgts = [("/cluster", cfg.GetClusterInfo())]
4564 ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
4565 tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
4566 nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
4567 tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
4569 for path, target in tgts:
4570 for tag in target.GetTags():
4571 if self.re.search(tag):
4572 results.append((path, tag))
4576 class LUAddTags(TagsLU):
4577 """Sets a tag on a given object.
4580 _OP_REQP = ["kind", "name", "tags"]
4582 def CheckPrereq(self):
4583 """Check prerequisites.
4585 This checks the type and length of the tag name and value.
4588 TagsLU.CheckPrereq(self)
4589 for tag in self.op.tags:
4590 objects.TaggableObject.ValidateTag(tag)
4592 def Exec(self, feedback_fn):
4597 for tag in self.op.tags:
4598 self.target.AddTag(tag)
4599 except errors.TagError, err:
4600 raise errors.OpExecError("Error while setting tag: %s" % str(err))
4602 self.cfg.Update(self.target)
4603 except errors.ConfigurationError:
4604 raise errors.OpRetryError("There has been a modification to the"
4605 " config file and the operation has been"
4606 " aborted. Please retry.")
4609 class LUDelTags(TagsLU):
4610 """Delete a list of tags from a given object.
4613 _OP_REQP = ["kind", "name", "tags"]
4615 def CheckPrereq(self):
4616 """Check prerequisites.
4618 This checks that we have the given tag.
4621 TagsLU.CheckPrereq(self)
4622 for tag in self.op.tags:
4623 objects.TaggableObject.ValidateTag(tag)
4624 del_tags = frozenset(self.op.tags)
4625 cur_tags = self.target.GetTags()
4626 if not del_tags <= cur_tags:
4627 diff_tags = del_tags - cur_tags
4628 diff_names = ["'%s'" % tag for tag in diff_tags]
4630 raise errors.OpPrereqError("Tag(s) %s not found" %
4631 (",".join(diff_names)))
4633 def Exec(self, feedback_fn):
4634 """Remove the tag from the object.
4637 for tag in self.op.tags:
4638 self.target.RemoveTag(tag)
4640 self.cfg.Update(self.target)
4641 except errors.ConfigurationError:
4642 raise errors.OpRetryError("There has been a modification to the"
4643 " config file and the operation has been"
4644 " aborted. Please retry.")
4647 class LUTestDelay(NoHooksLU):
4648 """Sleep for a specified amount of time.
4650 This LU sleeps on the master and/or nodes for a specified amount of
4654 _OP_REQP = ["duration", "on_master", "on_nodes"]
4657 def ExpandNames(self):
4658 """Expand names and set required locks.
4660 This expands the node list, if any.
4663 self.needed_locks = {}
4664 if self.op.on_nodes:
4665 # _GetWantedNodes can be used here, but is not always appropriate to use
4666 # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
4668 self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
4669 self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
4671 def CheckPrereq(self):
4672 """Check prerequisites.
4676 def Exec(self, feedback_fn):
4677 """Do the actual sleep.
4680 if self.op.on_master:
4681 if not utils.TestDelay(self.op.duration):
4682 raise errors.OpExecError("Error during master delay test")
4683 if self.op.on_nodes:
4684 result = rpc.call_test_delay(self.op.on_nodes, self.op.duration)
4686 raise errors.OpExecError("Complete failure from rpc call")
4687 for node, node_result in result.items():
4689 raise errors.OpExecError("Failure during rpc call to node %s,"
4690 " result: %s" % (node, node_result))
4693 class IAllocator(object):
4694 """IAllocator framework.
4696 An IAllocator instance has three sets of attributes:
4697 - cfg/sstore that are needed to query the cluster
4698 - input data (all members of the _KEYS class attribute are required)
4699 - four buffer attributes (in|out_data|text), that represent the
4700 input (to the external script) in text and data structure format,
4701 and the output from it, again in two formats
4702 - the result variables from the script (success, info, nodes) for
4707 "mem_size", "disks", "disk_template",
4708 "os", "tags", "nics", "vcpus",
4714 def __init__(self, cfg, sstore, mode, name, **kwargs):
4716 self.sstore = sstore
4717 # init buffer variables
4718 self.in_text = self.out_text = self.in_data = self.out_data = None
4719 # init all input fields so that pylint is happy
4722 self.mem_size = self.disks = self.disk_template = None
4723 self.os = self.tags = self.nics = self.vcpus = None
4724 self.relocate_from = None
4726 self.required_nodes = None
4727 # init result fields
4728 self.success = self.info = self.nodes = None
4729 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4730 keyset = self._ALLO_KEYS
4731 elif self.mode == constants.IALLOCATOR_MODE_RELOC:
4732 keyset = self._RELO_KEYS
4734 raise errors.ProgrammerError("Unknown mode '%s' passed to the"
4735 " IAllocator" % self.mode)
4737 if key not in keyset:
4738 raise errors.ProgrammerError("Invalid input parameter '%s' to"
4739 " IAllocator" % key)
4740 setattr(self, key, kwargs[key])
4742 if key not in kwargs:
4743 raise errors.ProgrammerError("Missing input parameter '%s' to"
4744 " IAllocator" % key)
4745 self._BuildInputData()
4747 def _ComputeClusterData(self):
4748 """Compute the generic allocator input data.
4750 This is the data that is independent of the actual operation.
4757 "cluster_name": self.sstore.GetClusterName(),
4758 "cluster_tags": list(cfg.GetClusterInfo().GetTags()),
4759 "hypervisor_type": self.sstore.GetHypervisorType(),
4760 # we don't have job IDs
4763 i_list = [cfg.GetInstanceInfo(iname) for iname in cfg.GetInstanceList()]
4767 node_list = cfg.GetNodeList()
4768 node_data = rpc.call_node_info(node_list, cfg.GetVGName())
4769 for nname in node_list:
4770 ninfo = cfg.GetNodeInfo(nname)
4771 if nname not in node_data or not isinstance(node_data[nname], dict):
4772 raise errors.OpExecError("Can't get data for node %s" % nname)
4773 remote_info = node_data[nname]
4774 for attr in ['memory_total', 'memory_free', 'memory_dom0',
4775 'vg_size', 'vg_free', 'cpu_total']:
4776 if attr not in remote_info:
4777 raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
4780 remote_info[attr] = int(remote_info[attr])
4781 except ValueError, err:
4782 raise errors.OpExecError("Node '%s' returned invalid value for '%s':"
4783 " %s" % (nname, attr, str(err)))
4784 # compute memory used by primary instances
4785 i_p_mem = i_p_up_mem = 0
4786 for iinfo in i_list:
4787 if iinfo.primary_node == nname:
4788 i_p_mem += iinfo.memory
4789 if iinfo.status == "up":
4790 i_p_up_mem += iinfo.memory
4792 # compute memory used by instances
4794 "tags": list(ninfo.GetTags()),
4795 "total_memory": remote_info['memory_total'],
4796 "reserved_memory": remote_info['memory_dom0'],
4797 "free_memory": remote_info['memory_free'],
4798 "i_pri_memory": i_p_mem,
4799 "i_pri_up_memory": i_p_up_mem,
4800 "total_disk": remote_info['vg_size'],
4801 "free_disk": remote_info['vg_free'],
4802 "primary_ip": ninfo.primary_ip,
4803 "secondary_ip": ninfo.secondary_ip,
4804 "total_cpus": remote_info['cpu_total'],
4806 node_results[nname] = pnr
4807 data["nodes"] = node_results
4811 for iinfo in i_list:
4812 nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
4813 for n in iinfo.nics]
4815 "tags": list(iinfo.GetTags()),
4816 "should_run": iinfo.status == "up",
4817 "vcpus": iinfo.vcpus,
4818 "memory": iinfo.memory,
4820 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
4822 "disks": [{"size": dsk.size, "mode": "w"} for dsk in iinfo.disks],
4823 "disk_template": iinfo.disk_template,
4825 instance_data[iinfo.name] = pir
4827 data["instances"] = instance_data
4831 def _AddNewInstance(self):
4832 """Add new instance data to allocator structure.
4834 This in combination with _AllocatorGetClusterData will create the
4835 correct structure needed as input for the allocator.
4837 The checks for the completeness of the opcode must have already been
4842 if len(self.disks) != 2:
4843 raise errors.OpExecError("Only two-disk configurations supported")
4845 disk_space = _ComputeDiskSize(self.disk_template,
4846 self.disks[0]["size"], self.disks[1]["size"])
4848 if self.disk_template in constants.DTS_NET_MIRROR:
4849 self.required_nodes = 2
4851 self.required_nodes = 1
4855 "disk_template": self.disk_template,
4858 "vcpus": self.vcpus,
4859 "memory": self.mem_size,
4860 "disks": self.disks,
4861 "disk_space_total": disk_space,
4863 "required_nodes": self.required_nodes,
4865 data["request"] = request
4867 def _AddRelocateInstance(self):
4868 """Add relocate instance data to allocator structure.
4870 This in combination with _IAllocatorGetClusterData will create the
4871 correct structure needed as input for the allocator.
4873 The checks for the completeness of the opcode must have already been
4877 instance = self.cfg.GetInstanceInfo(self.name)
4878 if instance is None:
4879 raise errors.ProgrammerError("Unknown instance '%s' passed to"
4880 " IAllocator" % self.name)
4882 if instance.disk_template not in constants.DTS_NET_MIRROR:
4883 raise errors.OpPrereqError("Can't relocate non-mirrored instances")
4885 if len(instance.secondary_nodes) != 1:
4886 raise errors.OpPrereqError("Instance has not exactly one secondary node")
4888 self.required_nodes = 1
4890 disk_space = _ComputeDiskSize(instance.disk_template,
4891 instance.disks[0].size,
4892 instance.disks[1].size)
4897 "disk_space_total": disk_space,
4898 "required_nodes": self.required_nodes,
4899 "relocate_from": self.relocate_from,
4901 self.in_data["request"] = request
4903 def _BuildInputData(self):
4904 """Build input data structures.
4907 self._ComputeClusterData()
4909 if self.mode == constants.IALLOCATOR_MODE_ALLOC:
4910 self._AddNewInstance()
4912 self._AddRelocateInstance()
4914 self.in_text = serializer.Dump(self.in_data)
4916 def Run(self, name, validate=True, call_fn=rpc.call_iallocator_runner):
4917 """Run an instance allocator and return the results.
4922 result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
4924 if not isinstance(result, tuple) or len(result) != 4:
4925 raise errors.OpExecError("Invalid result from master iallocator runner")
4927 rcode, stdout, stderr, fail = result
4929 if rcode == constants.IARUN_NOTFOUND:
4930 raise errors.OpExecError("Can't find allocator '%s'" % name)
4931 elif rcode == constants.IARUN_FAILURE:
4932 raise errors.OpExecError("Instance allocator call failed: %s,"
4934 (fail, stdout+stderr))
4935 self.out_text = stdout
4937 self._ValidateResult()
4939 def _ValidateResult(self):
4940 """Process the allocator results.
4942 This will process and if successful save the result in
4943 self.out_data and the other parameters.
4947 rdict = serializer.Load(self.out_text)
4948 except Exception, err:
4949 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
4951 if not isinstance(rdict, dict):
4952 raise errors.OpExecError("Can't parse iallocator results: not a dict")
4954 for key in "success", "info", "nodes":
4955 if key not in rdict:
4956 raise errors.OpExecError("Can't parse iallocator results:"
4957 " missing key '%s'" % key)
4958 setattr(self, key, rdict[key])
4960 if not isinstance(rdict["nodes"], list):
4961 raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
4963 self.out_data = rdict
4966 class LUTestAllocator(NoHooksLU):
4967 """Run allocator tests.
4969 This LU runs the allocator tests
4972 _OP_REQP = ["direction", "mode", "name"]
4974 def CheckPrereq(self):
4975 """Check prerequisites.
4977 This checks the opcode parameters depending on the director and mode test.
4980 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
4981 for attr in ["name", "mem_size", "disks", "disk_template",
4982 "os", "tags", "nics", "vcpus"]:
4983 if not hasattr(self.op, attr):
4984 raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
4986 iname = self.cfg.ExpandInstanceName(self.op.name)
4987 if iname is not None:
4988 raise errors.OpPrereqError("Instance '%s' already in the cluster" %
4990 if not isinstance(self.op.nics, list):
4991 raise errors.OpPrereqError("Invalid parameter 'nics'")
4992 for row in self.op.nics:
4993 if (not isinstance(row, dict) or
4996 "bridge" not in row):
4997 raise errors.OpPrereqError("Invalid contents of the"
4998 " 'nics' parameter")
4999 if not isinstance(self.op.disks, list):
5000 raise errors.OpPrereqError("Invalid parameter 'disks'")
5001 if len(self.op.disks) != 2:
5002 raise errors.OpPrereqError("Only two-disk configurations supported")
5003 for row in self.op.disks:
5004 if (not isinstance(row, dict) or
5005 "size" not in row or
5006 not isinstance(row["size"], int) or
5007 "mode" not in row or
5008 row["mode"] not in ['r', 'w']):
5009 raise errors.OpPrereqError("Invalid contents of the"
5010 " 'disks' parameter")
5011 elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
5012 if not hasattr(self.op, "name"):
5013 raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
5014 fname = self.cfg.ExpandInstanceName(self.op.name)
5016 raise errors.OpPrereqError("Instance '%s' not found for relocation" %
5018 self.op.name = fname
5019 self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
5021 raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
5024 if self.op.direction == constants.IALLOCATOR_DIR_OUT:
5025 if not hasattr(self.op, "allocator") or self.op.allocator is None:
5026 raise errors.OpPrereqError("Missing allocator name")
5027 elif self.op.direction != constants.IALLOCATOR_DIR_IN:
5028 raise errors.OpPrereqError("Wrong allocator test '%s'" %
5031 def Exec(self, feedback_fn):
5032 """Run the allocator test.
5035 if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
5036 ial = IAllocator(self.cfg, self.sstore,
5039 mem_size=self.op.mem_size,
5040 disks=self.op.disks,
5041 disk_template=self.op.disk_template,
5045 vcpus=self.op.vcpus,
5048 ial = IAllocator(self.cfg, self.sstore,
5051 relocate_from=list(self.relocate_from),
5054 if self.op.direction == constants.IALLOCATOR_DIR_IN:
5055 result = ial.in_text
5057 ial.Run(self.op.allocator, validate=False)
5058 result = ial.out_text