Return cluster tags from LUQueryClusterInfo
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq
51     - implement Exec
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_BGL = True
64
65   def __init__(self, processor, op, context, rpc):
66     """Constructor for LogicalUnit.
67
68     This needs to be overridden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = context.cfg
75     self.context = context
76     self.rpc = rpc
77     # Dicts used to declare locking needs to mcpu
78     self.needed_locks = None
79     self.acquired_locks = {}
80     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
81     self.add_locks = {}
82     self.remove_locks = {}
83     # Used to force good behavior when calling helper functions
84     self.recalculate_locks = {}
85     self.__ssh = None
86     # logging
87     self.LogWarning = processor.LogWarning
88     self.LogInfo = processor.LogInfo
89
90     for attr_name in self._OP_REQP:
91       attr_val = getattr(op, attr_name, None)
92       if attr_val is None:
93         raise errors.OpPrereqError("Required parameter '%s' missing" %
94                                    attr_name)
95     self.CheckArguments()
96
97   def __GetSSH(self):
98     """Returns the SshRunner object
99
100     """
101     if not self.__ssh:
102       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
103     return self.__ssh
104
105   ssh = property(fget=__GetSSH)
106
107   def CheckArguments(self):
108     """Check syntactic validity for the opcode arguments.
109
110     This method is for doing a simple syntactic check and ensure
111     validity of opcode parameters, without any cluster-related
112     checks. While the same can be accomplished in ExpandNames and/or
113     CheckPrereq, doing these separate is better because:
114
115       - ExpandNames is left as as purely a lock-related function
116       - CheckPrereq is run after we have acquired locks (and possible
117         waited for them)
118
119     The function is allowed to change the self.op attribute so that
120     later methods can no longer worry about missing parameters.
121
122     """
123     pass
124
125   def ExpandNames(self):
126     """Expand names for this LU.
127
128     This method is called before starting to execute the opcode, and it should
129     update all the parameters of the opcode to their canonical form (e.g. a
130     short node name must be fully expanded after this method has successfully
131     completed). This way locking, hooks, logging, ecc. can work correctly.
132
133     LUs which implement this method must also populate the self.needed_locks
134     member, as a dict with lock levels as keys, and a list of needed lock names
135     as values. Rules:
136
137       - use an empty dict if you don't need any lock
138       - if you don't need any lock at a particular level omit that level
139       - don't put anything for the BGL level
140       - if you want all locks at a level use locking.ALL_SET as a value
141
142     If you need to share locks (rather than acquire them exclusively) at one
143     level you can modify self.share_locks, setting a true value (usually 1) for
144     that level. By default locks are not shared.
145
146     Examples::
147
148       # Acquire all nodes and one instance
149       self.needed_locks = {
150         locking.LEVEL_NODE: locking.ALL_SET,
151         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
152       }
153       # Acquire just two nodes
154       self.needed_locks = {
155         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
156       }
157       # Acquire no locks
158       self.needed_locks = {} # No, you can't leave it to the default value None
159
160     """
161     # The implementation of this method is mandatory only if the new LU is
162     # concurrent, so that old LUs don't need to be changed all at the same
163     # time.
164     if self.REQ_BGL:
165       self.needed_locks = {} # Exclusive LUs don't need locks.
166     else:
167       raise NotImplementedError
168
169   def DeclareLocks(self, level):
170     """Declare LU locking needs for a level
171
172     While most LUs can just declare their locking needs at ExpandNames time,
173     sometimes there's the need to calculate some locks after having acquired
174     the ones before. This function is called just before acquiring locks at a
175     particular level, but after acquiring the ones at lower levels, and permits
176     such calculations. It can be used to modify self.needed_locks, and by
177     default it does nothing.
178
179     This function is only called if you have something already set in
180     self.needed_locks for the level.
181
182     @param level: Locking level which is going to be locked
183     @type level: member of ganeti.locking.LEVELS
184
185     """
186
187   def CheckPrereq(self):
188     """Check prerequisites for this LU.
189
190     This method should check that the prerequisites for the execution
191     of this LU are fulfilled. It can do internode communication, but
192     it should be idempotent - no cluster or system changes are
193     allowed.
194
195     The method should raise errors.OpPrereqError in case something is
196     not fulfilled. Its return value is ignored.
197
198     This method should also update all the parameters of the opcode to
199     their canonical form if it hasn't been done by ExpandNames before.
200
201     """
202     raise NotImplementedError
203
204   def Exec(self, feedback_fn):
205     """Execute the LU.
206
207     This method should implement the actual work. It should raise
208     errors.OpExecError for failures that are somewhat dealt with in
209     code, or expected.
210
211     """
212     raise NotImplementedError
213
214   def BuildHooksEnv(self):
215     """Build hooks environment for this LU.
216
217     This method should return a three-node tuple consisting of: a dict
218     containing the environment that will be used for running the
219     specific hook for this LU, a list of node names on which the hook
220     should run before the execution, and a list of node names on which
221     the hook should run after the execution.
222
223     The keys of the dict must not have 'GANETI_' prefixed as this will
224     be handled in the hooks runner. Also note additional keys will be
225     added by the hooks runner. If the LU doesn't define any
226     environment, an empty dict (and not None) should be returned.
227
228     No nodes should be returned as an empty list (and not None).
229
230     Note that if the HPATH for a LU class is None, this function will
231     not be called.
232
233     """
234     raise NotImplementedError
235
236   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
237     """Notify the LU about the results of its hooks.
238
239     This method is called every time a hooks phase is executed, and notifies
240     the Logical Unit about the hooks' result. The LU can then use it to alter
241     its result based on the hooks.  By default the method does nothing and the
242     previous result is passed back unchanged but any LU can define it if it
243     wants to use the local cluster hook-scripts somehow.
244
245     @param phase: one of L{constants.HOOKS_PHASE_POST} or
246         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
247     @param hook_results: the results of the multi-node hooks rpc call
248     @param feedback_fn: function used send feedback back to the caller
249     @param lu_result: the previous Exec result this LU had, or None
250         in the PRE phase
251     @return: the new Exec result, based on the previous result
252         and hook results
253
254     """
255     return lu_result
256
257   def _ExpandAndLockInstance(self):
258     """Helper function to expand and lock an instance.
259
260     Many LUs that work on an instance take its name in self.op.instance_name
261     and need to expand it and then declare the expanded name for locking. This
262     function does it, and then updates self.op.instance_name to the expanded
263     name. It also initializes needed_locks as a dict, if this hasn't been done
264     before.
265
266     """
267     if self.needed_locks is None:
268       self.needed_locks = {}
269     else:
270       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
271         "_ExpandAndLockInstance called with instance-level locks set"
272     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
273     if expanded_name is None:
274       raise errors.OpPrereqError("Instance '%s' not known" %
275                                   self.op.instance_name)
276     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
277     self.op.instance_name = expanded_name
278
279   def _LockInstancesNodes(self, primary_only=False):
280     """Helper function to declare instances' nodes for locking.
281
282     This function should be called after locking one or more instances to lock
283     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
284     with all primary or secondary nodes for instances already locked and
285     present in self.needed_locks[locking.LEVEL_INSTANCE].
286
287     It should be called from DeclareLocks, and for safety only works if
288     self.recalculate_locks[locking.LEVEL_NODE] is set.
289
290     In the future it may grow parameters to just lock some instance's nodes, or
291     to just lock primaries or secondary nodes, if needed.
292
293     If should be called in DeclareLocks in a way similar to::
294
295       if level == locking.LEVEL_NODE:
296         self._LockInstancesNodes()
297
298     @type primary_only: boolean
299     @param primary_only: only lock primary nodes of locked instances
300
301     """
302     assert locking.LEVEL_NODE in self.recalculate_locks, \
303       "_LockInstancesNodes helper function called with no nodes to recalculate"
304
305     # TODO: check if we're really been called with the instance locks held
306
307     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
308     # future we might want to have different behaviors depending on the value
309     # of self.recalculate_locks[locking.LEVEL_NODE]
310     wanted_nodes = []
311     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
312       instance = self.context.cfg.GetInstanceInfo(instance_name)
313       wanted_nodes.append(instance.primary_node)
314       if not primary_only:
315         wanted_nodes.extend(instance.secondary_nodes)
316
317     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
318       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
319     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
320       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
321
322     del self.recalculate_locks[locking.LEVEL_NODE]
323
324
325 class NoHooksLU(LogicalUnit):
326   """Simple LU which runs no hooks.
327
328   This LU is intended as a parent for other LogicalUnits which will
329   run no hooks, in order to reduce duplicate code.
330
331   """
332   HPATH = None
333   HTYPE = None
334
335
336 def _GetWantedNodes(lu, nodes):
337   """Returns list of checked and expanded node names.
338
339   @type lu: L{LogicalUnit}
340   @param lu: the logical unit on whose behalf we execute
341   @type nodes: list
342   @param nodes: list of node names or None for all nodes
343   @rtype: list
344   @return: the list of nodes, sorted
345   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
346
347   """
348   if not isinstance(nodes, list):
349     raise errors.OpPrereqError("Invalid argument type 'nodes'")
350
351   if not nodes:
352     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
353       " non-empty list of nodes whose name is to be expanded.")
354
355   wanted = []
356   for name in nodes:
357     node = lu.cfg.ExpandNodeName(name)
358     if node is None:
359       raise errors.OpPrereqError("No such node name '%s'" % name)
360     wanted.append(node)
361
362   return utils.NiceSort(wanted)
363
364
365 def _GetWantedInstances(lu, instances):
366   """Returns list of checked and expanded instance names.
367
368   @type lu: L{LogicalUnit}
369   @param lu: the logical unit on whose behalf we execute
370   @type instances: list
371   @param instances: list of instance names or None for all instances
372   @rtype: list
373   @return: the list of instances, sorted
374   @raise errors.OpPrereqError: if the instances parameter is wrong type
375   @raise errors.OpPrereqError: if any of the passed instances is not found
376
377   """
378   if not isinstance(instances, list):
379     raise errors.OpPrereqError("Invalid argument type 'instances'")
380
381   if instances:
382     wanted = []
383
384     for name in instances:
385       instance = lu.cfg.ExpandInstanceName(name)
386       if instance is None:
387         raise errors.OpPrereqError("No such instance name '%s'" % name)
388       wanted.append(instance)
389
390   else:
391     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
392   return wanted
393
394
395 def _CheckOutputFields(static, dynamic, selected):
396   """Checks whether all selected fields are valid.
397
398   @type static: L{utils.FieldSet}
399   @param static: static fields set
400   @type dynamic: L{utils.FieldSet}
401   @param dynamic: dynamic fields set
402
403   """
404   f = utils.FieldSet()
405   f.Extend(static)
406   f.Extend(dynamic)
407
408   delta = f.NonMatching(selected)
409   if delta:
410     raise errors.OpPrereqError("Unknown output fields selected: %s"
411                                % ",".join(delta))
412
413
414 def _CheckBooleanOpField(op, name):
415   """Validates boolean opcode parameters.
416
417   This will ensure that an opcode parameter is either a boolean value,
418   or None (but that it always exists).
419
420   """
421   val = getattr(op, name, None)
422   if not (val is None or isinstance(val, bool)):
423     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
424                                (name, str(val)))
425   setattr(op, name, val)
426
427
428 def _CheckNodeOnline(lu, node):
429   """Ensure that a given node is online.
430
431   @param lu: the LU on behalf of which we make the check
432   @param node: the node to check
433   @raise errors.OpPrereqError: if the node is offline
434
435   """
436   if lu.cfg.GetNodeInfo(node).offline:
437     raise errors.OpPrereqError("Can't use offline node %s" % node)
438
439
440 def _CheckNodeNotDrained(lu, node):
441   """Ensure that a given node is not drained.
442
443   @param lu: the LU on behalf of which we make the check
444   @param node: the node to check
445   @raise errors.OpPrereqError: if the node is drained
446
447   """
448   if lu.cfg.GetNodeInfo(node).drained:
449     raise errors.OpPrereqError("Can't use drained node %s" % node)
450
451
452 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
453                           memory, vcpus, nics, disk_template, disks,
454                           bep, hvp, hypervisor_name):
455   """Builds instance related env variables for hooks
456
457   This builds the hook environment from individual variables.
458
459   @type name: string
460   @param name: the name of the instance
461   @type primary_node: string
462   @param primary_node: the name of the instance's primary node
463   @type secondary_nodes: list
464   @param secondary_nodes: list of secondary nodes as strings
465   @type os_type: string
466   @param os_type: the name of the instance's OS
467   @type status: boolean
468   @param status: the should_run status of the instance
469   @type memory: string
470   @param memory: the memory size of the instance
471   @type vcpus: string
472   @param vcpus: the count of VCPUs the instance has
473   @type nics: list
474   @param nics: list of tuples (ip, bridge, mac) representing
475       the NICs the instance  has
476   @type disk_template: string
477   @param disk_template: the disk template of the instance
478   @type disks: list
479   @param disks: the list of (size, mode) pairs
480   @type bep: dict
481   @param bep: the backend parameters for the instance
482   @type hvp: dict
483   @param hvp: the hypervisor parameters for the instance
484   @type hypervisor_name: string
485   @param hypervisor_name: the hypervisor for the instance
486   @rtype: dict
487   @return: the hook environment for this instance
488
489   """
490   if status:
491     str_status = "up"
492   else:
493     str_status = "down"
494   env = {
495     "OP_TARGET": name,
496     "INSTANCE_NAME": name,
497     "INSTANCE_PRIMARY": primary_node,
498     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
499     "INSTANCE_OS_TYPE": os_type,
500     "INSTANCE_STATUS": str_status,
501     "INSTANCE_MEMORY": memory,
502     "INSTANCE_VCPUS": vcpus,
503     "INSTANCE_DISK_TEMPLATE": disk_template,
504     "INSTANCE_HYPERVISOR": hypervisor_name,
505   }
506
507   if nics:
508     nic_count = len(nics)
509     for idx, (ip, bridge, mac) in enumerate(nics):
510       if ip is None:
511         ip = ""
512       env["INSTANCE_NIC%d_IP" % idx] = ip
513       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
514       env["INSTANCE_NIC%d_MAC" % idx] = mac
515   else:
516     nic_count = 0
517
518   env["INSTANCE_NIC_COUNT"] = nic_count
519
520   if disks:
521     disk_count = len(disks)
522     for idx, (size, mode) in enumerate(disks):
523       env["INSTANCE_DISK%d_SIZE" % idx] = size
524       env["INSTANCE_DISK%d_MODE" % idx] = mode
525   else:
526     disk_count = 0
527
528   env["INSTANCE_DISK_COUNT"] = disk_count
529
530   for source, kind in [(bep, "BE"), (hvp, "HV")]:
531     for key, value in source.items():
532       env["INSTANCE_%s_%s" % (kind, key)] = value
533
534   return env
535
536
537 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
538   """Builds instance related env variables for hooks from an object.
539
540   @type lu: L{LogicalUnit}
541   @param lu: the logical unit on whose behalf we execute
542   @type instance: L{objects.Instance}
543   @param instance: the instance for which we should build the
544       environment
545   @type override: dict
546   @param override: dictionary with key/values that will override
547       our values
548   @rtype: dict
549   @return: the hook environment dictionary
550
551   """
552   cluster = lu.cfg.GetClusterInfo()
553   bep = cluster.FillBE(instance)
554   hvp = cluster.FillHV(instance)
555   args = {
556     'name': instance.name,
557     'primary_node': instance.primary_node,
558     'secondary_nodes': instance.secondary_nodes,
559     'os_type': instance.os,
560     'status': instance.admin_up,
561     'memory': bep[constants.BE_MEMORY],
562     'vcpus': bep[constants.BE_VCPUS],
563     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
564     'disk_template': instance.disk_template,
565     'disks': [(disk.size, disk.mode) for disk in instance.disks],
566     'bep': bep,
567     'hvp': hvp,
568     'hypervisor_name': instance.hypervisor,
569   }
570   if override:
571     args.update(override)
572   return _BuildInstanceHookEnv(**args)
573
574
575 def _AdjustCandidatePool(lu):
576   """Adjust the candidate pool after node operations.
577
578   """
579   mod_list = lu.cfg.MaintainCandidatePool()
580   if mod_list:
581     lu.LogInfo("Promoted nodes to master candidate role: %s",
582                ", ".join(node.name for node in mod_list))
583     for name in mod_list:
584       lu.context.ReaddNode(name)
585   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
586   if mc_now > mc_max:
587     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
588                (mc_now, mc_max))
589
590
591 def _CheckInstanceBridgesExist(lu, instance):
592   """Check that the bridges needed by an instance exist.
593
594   """
595   # check bridges existence
596   brlist = [nic.bridge for nic in instance.nics]
597   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
598   result.Raise()
599   if not result.data:
600     raise errors.OpPrereqError("One or more target bridges %s does not"
601                                " exist on destination node '%s'" %
602                                (brlist, instance.primary_node))
603
604
605 class LUDestroyCluster(NoHooksLU):
606   """Logical unit for destroying the cluster.
607
608   """
609   _OP_REQP = []
610
611   def CheckPrereq(self):
612     """Check prerequisites.
613
614     This checks whether the cluster is empty.
615
616     Any errors are signaled by raising errors.OpPrereqError.
617
618     """
619     master = self.cfg.GetMasterNode()
620
621     nodelist = self.cfg.GetNodeList()
622     if len(nodelist) != 1 or nodelist[0] != master:
623       raise errors.OpPrereqError("There are still %d node(s) in"
624                                  " this cluster." % (len(nodelist) - 1))
625     instancelist = self.cfg.GetInstanceList()
626     if instancelist:
627       raise errors.OpPrereqError("There are still %d instance(s) in"
628                                  " this cluster." % len(instancelist))
629
630   def Exec(self, feedback_fn):
631     """Destroys the cluster.
632
633     """
634     master = self.cfg.GetMasterNode()
635     result = self.rpc.call_node_stop_master(master, False)
636     result.Raise()
637     if not result.data:
638       raise errors.OpExecError("Could not disable the master role")
639     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640     utils.CreateBackup(priv_key)
641     utils.CreateBackup(pub_key)
642     return master
643
644
645 class LUVerifyCluster(LogicalUnit):
646   """Verifies the cluster status.
647
648   """
649   HPATH = "cluster-verify"
650   HTYPE = constants.HTYPE_CLUSTER
651   _OP_REQP = ["skip_checks"]
652   REQ_BGL = False
653
654   def ExpandNames(self):
655     self.needed_locks = {
656       locking.LEVEL_NODE: locking.ALL_SET,
657       locking.LEVEL_INSTANCE: locking.ALL_SET,
658     }
659     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660
661   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662                   node_result, feedback_fn, master_files,
663                   drbd_map, vg_name):
664     """Run multiple tests against a node.
665
666     Test list:
667
668       - compares ganeti version
669       - checks vg existence and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     @type nodeinfo: L{objects.Node}
674     @param nodeinfo: the node to check
675     @param file_list: required list of files
676     @param local_cksum: dictionary of local files and their checksums
677     @param node_result: the results from the node
678     @param feedback_fn: function used to accumulate results
679     @param master_files: list of files that only masters should have
680     @param drbd_map: the useddrbd minors for this node, in
681         form of minor: (instance, must_exist) which correspond to instances
682         and their running status
683     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684
685     """
686     node = nodeinfo.name
687
688     # main result, node_result should be a non-empty dict
689     if not node_result or not isinstance(node_result, dict):
690       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691       return True
692
693     # compares ganeti version
694     local_version = constants.PROTOCOL_VERSION
695     remote_version = node_result.get('version', None)
696     if not (remote_version and isinstance(remote_version, (list, tuple)) and
697             len(remote_version) == 2):
698       feedback_fn("  - ERROR: connection to %s failed" % (node))
699       return True
700
701     if local_version != remote_version[0]:
702       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703                   " node %s %s" % (local_version, node, remote_version[0]))
704       return True
705
706     # node seems compatible, we can actually try to look into its results
707
708     bad = False
709
710     # full package version
711     if constants.RELEASE_VERSION != remote_version[1]:
712       feedback_fn("  - WARNING: software version mismatch: master %s,"
713                   " node %s %s" %
714                   (constants.RELEASE_VERSION, node, remote_version[1]))
715
716     # checks vg existence and size > 20G
717     if vg_name is not None:
718       vglist = node_result.get(constants.NV_VGLIST, None)
719       if not vglist:
720         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721                         (node,))
722         bad = True
723       else:
724         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725                                               constants.MIN_VG_SIZE)
726         if vgstatus:
727           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728           bad = True
729
730     # checks config file checksum
731
732     remote_cksum = node_result.get(constants.NV_FILELIST, None)
733     if not isinstance(remote_cksum, dict):
734       bad = True
735       feedback_fn("  - ERROR: node hasn't returned file checksum data")
736     else:
737       for file_name in file_list:
738         node_is_mc = nodeinfo.master_candidate
739         must_have_file = file_name not in master_files
740         if file_name not in remote_cksum:
741           if node_is_mc or must_have_file:
742             bad = True
743             feedback_fn("  - ERROR: file '%s' missing" % file_name)
744         elif remote_cksum[file_name] != local_cksum[file_name]:
745           if node_is_mc or must_have_file:
746             bad = True
747             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748           else:
749             # not candidate and this is not a must-have file
750             bad = True
751             feedback_fn("  - ERROR: file '%s' should not exist on non master"
752                         " candidates (and the file is outdated)" % file_name)
753         else:
754           # all good, except non-master/non-must have combination
755           if not node_is_mc and not must_have_file:
756             feedback_fn("  - ERROR: file '%s' should not exist on non master"
757                         " candidates" % file_name)
758
759     # checks ssh to any
760
761     if constants.NV_NODELIST not in node_result:
762       bad = True
763       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764     else:
765       if node_result[constants.NV_NODELIST]:
766         bad = True
767         for node in node_result[constants.NV_NODELIST]:
768           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769                           (node, node_result[constants.NV_NODELIST][node]))
770
771     if constants.NV_NODENETTEST not in node_result:
772       bad = True
773       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774     else:
775       if node_result[constants.NV_NODENETTEST]:
776         bad = True
777         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778         for node in nlist:
779           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780                           (node, node_result[constants.NV_NODENETTEST][node]))
781
782     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783     if isinstance(hyp_result, dict):
784       for hv_name, hv_result in hyp_result.iteritems():
785         if hv_result is not None:
786           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787                       (hv_name, hv_result))
788
789     # check used drbd list
790     if vg_name is not None:
791       used_minors = node_result.get(constants.NV_DRBDLIST, [])
792       if not isinstance(used_minors, (tuple, list)):
793         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794                     str(used_minors))
795       else:
796         for minor, (iname, must_exist) in drbd_map.items():
797           if minor not in used_minors and must_exist:
798             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799                         " not active" % (minor, iname))
800             bad = True
801         for minor in used_minors:
802           if minor not in drbd_map:
803             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804                         minor)
805             bad = True
806
807     return bad
808
809   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810                       node_instance, feedback_fn, n_offline):
811     """Verify an instance.
812
813     This function checks to see if the required block devices are
814     available on the instance's node.
815
816     """
817     bad = False
818
819     node_current = instanceconfig.primary_node
820
821     node_vol_should = {}
822     instanceconfig.MapLVsByNode(node_vol_should)
823
824     for node in node_vol_should:
825       if node in n_offline:
826         # ignore missing volumes on offline nodes
827         continue
828       for volume in node_vol_should[node]:
829         if node not in node_vol_is or volume not in node_vol_is[node]:
830           feedback_fn("  - ERROR: volume %s missing on node %s" %
831                           (volume, node))
832           bad = True
833
834     if instanceconfig.admin_up:
835       if ((node_current not in node_instance or
836           not instance in node_instance[node_current]) and
837           node_current not in n_offline):
838         feedback_fn("  - ERROR: instance %s not running on node %s" %
839                         (instance, node_current))
840         bad = True
841
842     for node in node_instance:
843       if (not node == node_current):
844         if instance in node_instance[node]:
845           feedback_fn("  - ERROR: instance %s should not run on node %s" %
846                           (instance, node))
847           bad = True
848
849     return bad
850
851   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852     """Verify if there are any unknown volumes in the cluster.
853
854     The .os, .swap and backup volumes are ignored. All other volumes are
855     reported as unknown.
856
857     """
858     bad = False
859
860     for node in node_vol_is:
861       for volume in node_vol_is[node]:
862         if node not in node_vol_should or volume not in node_vol_should[node]:
863           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864                       (volume, node))
865           bad = True
866     return bad
867
868   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869     """Verify the list of running instances.
870
871     This checks what instances are running but unknown to the cluster.
872
873     """
874     bad = False
875     for node in node_instance:
876       for runninginstance in node_instance[node]:
877         if runninginstance not in instancelist:
878           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879                           (runninginstance, node))
880           bad = True
881     return bad
882
883   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884     """Verify N+1 Memory Resilience.
885
886     Check that if one single node dies we can still start all the instances it
887     was primary for.
888
889     """
890     bad = False
891
892     for node, nodeinfo in node_info.iteritems():
893       # This code checks that every node which is now listed as secondary has
894       # enough memory to host all instances it is supposed to should a single
895       # other node in the cluster fail.
896       # FIXME: not ready for failover to an arbitrary node
897       # FIXME: does not support file-backed instances
898       # WARNING: we currently take into account down instances as well as up
899       # ones, considering that even if they're down someone might want to start
900       # them even in the event of a node failure.
901       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902         needed_mem = 0
903         for instance in instances:
904           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905           if bep[constants.BE_AUTO_BALANCE]:
906             needed_mem += bep[constants.BE_MEMORY]
907         if nodeinfo['mfree'] < needed_mem:
908           feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
909                       " failovers should node %s fail" % (node, prinode))
910           bad = True
911     return bad
912
913   def CheckPrereq(self):
914     """Check prerequisites.
915
916     Transform the list of checks we're going to skip into a set and check that
917     all its members are valid.
918
919     """
920     self.skip_set = frozenset(self.op.skip_checks)
921     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922       raise errors.OpPrereqError("Invalid checks to be skipped specified")
923
924   def BuildHooksEnv(self):
925     """Build hooks env.
926
927     Cluster-Verify hooks just ran in the post phase and their failure makes
928     the output be logged in the verify output and the verification to fail.
929
930     """
931     all_nodes = self.cfg.GetNodeList()
932     env = {
933       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934       }
935     for node in self.cfg.GetAllNodesInfo().values():
936       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937
938     return env, [], all_nodes
939
940   def Exec(self, feedback_fn):
941     """Verify integrity of cluster, performing various test on nodes.
942
943     """
944     bad = False
945     feedback_fn("* Verifying global settings")
946     for msg in self.cfg.VerifyConfig():
947       feedback_fn("  - ERROR: %s" % msg)
948
949     vg_name = self.cfg.GetVGName()
950     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951     nodelist = utils.NiceSort(self.cfg.GetNodeList())
952     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955                         for iname in instancelist)
956     i_non_redundant = [] # Non redundant instances
957     i_non_a_balanced = [] # Non auto-balanced instances
958     n_offline = [] # List of offline nodes
959     n_drained = [] # List of nodes being drained
960     node_volume = {}
961     node_instance = {}
962     node_info = {}
963     instance_cfg = {}
964
965     # FIXME: verify OS list
966     # do local checksums
967     master_files = [constants.CLUSTER_CONF_FILE]
968
969     file_names = ssconf.SimpleStore().GetFileList()
970     file_names.append(constants.SSL_CERT_FILE)
971     file_names.append(constants.RAPI_CERT_FILE)
972     file_names.extend(master_files)
973
974     local_checksums = utils.FingerprintFiles(file_names)
975
976     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977     node_verify_param = {
978       constants.NV_FILELIST: file_names,
979       constants.NV_NODELIST: [node.name for node in nodeinfo
980                               if not node.offline],
981       constants.NV_HYPERVISOR: hypervisors,
982       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983                                   node.secondary_ip) for node in nodeinfo
984                                  if not node.offline],
985       constants.NV_INSTANCELIST: hypervisors,
986       constants.NV_VERSION: None,
987       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988       }
989     if vg_name is not None:
990       node_verify_param[constants.NV_VGLIST] = None
991       node_verify_param[constants.NV_LVLIST] = vg_name
992       node_verify_param[constants.NV_DRBDLIST] = None
993     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994                                            self.cfg.GetClusterName())
995
996     cluster = self.cfg.GetClusterInfo()
997     master_node = self.cfg.GetMasterNode()
998     all_drbd_map = self.cfg.ComputeDRBDMap()
999
1000     for node_i in nodeinfo:
1001       node = node_i.name
1002       nresult = all_nvinfo[node].data
1003
1004       if node_i.offline:
1005         feedback_fn("* Skipping offline node %s" % (node,))
1006         n_offline.append(node)
1007         continue
1008
1009       if node == master_node:
1010         ntype = "master"
1011       elif node_i.master_candidate:
1012         ntype = "master candidate"
1013       elif node_i.drained:
1014         ntype = "drained"
1015         n_drained.append(node)
1016       else:
1017         ntype = "regular"
1018       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019
1020       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022         bad = True
1023         continue
1024
1025       node_drbd = {}
1026       for minor, instance in all_drbd_map[node].items():
1027         if instance not in instanceinfo:
1028           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029                       instance)
1030           # ghost instance should not be running, but otherwise we
1031           # don't give double warnings (both ghost instance and
1032           # unallocated minor in use)
1033           node_drbd[minor] = (instance, False)
1034         else:
1035           instance = instanceinfo[instance]
1036           node_drbd[minor] = (instance.name, instance.admin_up)
1037       result = self._VerifyNode(node_i, file_names, local_checksums,
1038                                 nresult, feedback_fn, master_files,
1039                                 node_drbd, vg_name)
1040       bad = bad or result
1041
1042       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043       if vg_name is None:
1044         node_volume[node] = {}
1045       elif isinstance(lvdata, basestring):
1046         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047                     (node, utils.SafeEncode(lvdata)))
1048         bad = True
1049         node_volume[node] = {}
1050       elif not isinstance(lvdata, dict):
1051         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052         bad = True
1053         continue
1054       else:
1055         node_volume[node] = lvdata
1056
1057       # node_instance
1058       idata = nresult.get(constants.NV_INSTANCELIST, None)
1059       if not isinstance(idata, list):
1060         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061                     (node,))
1062         bad = True
1063         continue
1064
1065       node_instance[node] = idata
1066
1067       # node_info
1068       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069       if not isinstance(nodeinfo, dict):
1070         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071         bad = True
1072         continue
1073
1074       try:
1075         node_info[node] = {
1076           "mfree": int(nodeinfo['memory_free']),
1077           "pinst": [],
1078           "sinst": [],
1079           # dictionary holding all instances this node is secondary for,
1080           # grouped by their primary node. Each key is a cluster node, and each
1081           # value is a list of instances which have the key as primary and the
1082           # current node as secondary.  this is handy to calculate N+1 memory
1083           # availability if you can only failover from a primary to its
1084           # secondary.
1085           "sinst-by-pnode": {},
1086         }
1087         # FIXME: devise a free space model for file based instances as well
1088         if vg_name is not None:
1089           if (constants.NV_VGLIST not in nresult or
1090               vg_name not in nresult[constants.NV_VGLIST]):
1091             feedback_fn("  - ERROR: node %s didn't return data for the"
1092                         " volume group '%s' - it is either missing or broken" %
1093                         (node, vg_name))
1094             bad = True
1095             continue
1096           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097       except (ValueError, KeyError):
1098         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099                     " from node %s" % (node,))
1100         bad = True
1101         continue
1102
1103     node_vol_should = {}
1104
1105     for instance in instancelist:
1106       feedback_fn("* Verifying instance %s" % instance)
1107       inst_config = instanceinfo[instance]
1108       result =  self._VerifyInstance(instance, inst_config, node_volume,
1109                                      node_instance, feedback_fn, n_offline)
1110       bad = bad or result
1111       inst_nodes_offline = []
1112
1113       inst_config.MapLVsByNode(node_vol_should)
1114
1115       instance_cfg[instance] = inst_config
1116
1117       pnode = inst_config.primary_node
1118       if pnode in node_info:
1119         node_info[pnode]['pinst'].append(instance)
1120       elif pnode not in n_offline:
1121         feedback_fn("  - ERROR: instance %s, connection to primary node"
1122                     " %s failed" % (instance, pnode))
1123         bad = True
1124
1125       if pnode in n_offline:
1126         inst_nodes_offline.append(pnode)
1127
1128       # If the instance is non-redundant we cannot survive losing its primary
1129       # node, so we are not N+1 compliant. On the other hand we have no disk
1130       # templates with more than one secondary so that situation is not well
1131       # supported either.
1132       # FIXME: does not support file-backed instances
1133       if len(inst_config.secondary_nodes) == 0:
1134         i_non_redundant.append(instance)
1135       elif len(inst_config.secondary_nodes) > 1:
1136         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137                     % instance)
1138
1139       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140         i_non_a_balanced.append(instance)
1141
1142       for snode in inst_config.secondary_nodes:
1143         if snode in node_info:
1144           node_info[snode]['sinst'].append(instance)
1145           if pnode not in node_info[snode]['sinst-by-pnode']:
1146             node_info[snode]['sinst-by-pnode'][pnode] = []
1147           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148         elif snode not in n_offline:
1149           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150                       " %s failed" % (instance, snode))
1151           bad = True
1152         if snode in n_offline:
1153           inst_nodes_offline.append(snode)
1154
1155       if inst_nodes_offline:
1156         # warn that the instance lives on offline nodes, and set bad=True
1157         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158                     ", ".join(inst_nodes_offline))
1159         bad = True
1160
1161     feedback_fn("* Verifying orphan volumes")
1162     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163                                        feedback_fn)
1164     bad = bad or result
1165
1166     feedback_fn("* Verifying remaining instances")
1167     result = self._VerifyOrphanInstances(instancelist, node_instance,
1168                                          feedback_fn)
1169     bad = bad or result
1170
1171     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172       feedback_fn("* Verifying N+1 Memory redundancy")
1173       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174       bad = bad or result
1175
1176     feedback_fn("* Other Notes")
1177     if i_non_redundant:
1178       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179                   % len(i_non_redundant))
1180
1181     if i_non_a_balanced:
1182       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183                   % len(i_non_a_balanced))
1184
1185     if n_offline:
1186       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187
1188     if n_drained:
1189       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190
1191     return not bad
1192
1193   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194     """Analyze the post-hooks' result
1195
1196     This method analyses the hook result, handles it, and sends some
1197     nicely-formatted feedback back to the user.
1198
1199     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201     @param hooks_results: the results of the multi-node hooks rpc call
1202     @param feedback_fn: function used send feedback back to the caller
1203     @param lu_result: previous Exec result
1204     @return: the new Exec result, based on the previous result
1205         and hook results
1206
1207     """
1208     # We only really run POST phase hooks, and are only interested in
1209     # their results
1210     if phase == constants.HOOKS_PHASE_POST:
1211       # Used to change hooks' output to proper indentation
1212       indent_re = re.compile('^', re.M)
1213       feedback_fn("* Hooks Results")
1214       if not hooks_results:
1215         feedback_fn("  - ERROR: general communication failure")
1216         lu_result = 1
1217       else:
1218         for node_name in hooks_results:
1219           show_node_header = True
1220           res = hooks_results[node_name]
1221           if res.failed or res.data is False or not isinstance(res.data, list):
1222             if res.offline:
1223               # no need to warn or set fail return value
1224               continue
1225             feedback_fn("    Communication failure in hooks execution")
1226             lu_result = 1
1227             continue
1228           for script, hkr, output in res.data:
1229             if hkr == constants.HKR_FAIL:
1230               # The node header is only shown once, if there are
1231               # failing hooks on that node
1232               if show_node_header:
1233                 feedback_fn("  Node %s:" % node_name)
1234                 show_node_header = False
1235               feedback_fn("    ERROR: Script %s failed, output:" % script)
1236               output = indent_re.sub('      ', output)
1237               feedback_fn("%s" % output)
1238               lu_result = 1
1239
1240       return lu_result
1241
1242
1243 class LUVerifyDisks(NoHooksLU):
1244   """Verifies the cluster disks status.
1245
1246   """
1247   _OP_REQP = []
1248   REQ_BGL = False
1249
1250   def ExpandNames(self):
1251     self.needed_locks = {
1252       locking.LEVEL_NODE: locking.ALL_SET,
1253       locking.LEVEL_INSTANCE: locking.ALL_SET,
1254     }
1255     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256
1257   def CheckPrereq(self):
1258     """Check prerequisites.
1259
1260     This has no prerequisites.
1261
1262     """
1263     pass
1264
1265   def Exec(self, feedback_fn):
1266     """Verify integrity of cluster disks.
1267
1268     """
1269     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270
1271     vg_name = self.cfg.GetVGName()
1272     nodes = utils.NiceSort(self.cfg.GetNodeList())
1273     instances = [self.cfg.GetInstanceInfo(name)
1274                  for name in self.cfg.GetInstanceList()]
1275
1276     nv_dict = {}
1277     for inst in instances:
1278       inst_lvs = {}
1279       if (not inst.admin_up or
1280           inst.disk_template not in constants.DTS_NET_MIRROR):
1281         continue
1282       inst.MapLVsByNode(inst_lvs)
1283       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284       for node, vol_list in inst_lvs.iteritems():
1285         for vol in vol_list:
1286           nv_dict[(node, vol)] = inst
1287
1288     if not nv_dict:
1289       return result
1290
1291     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292
1293     for node in nodes:
1294       # node_volume
1295       lvs = node_lvs[node]
1296       if lvs.failed:
1297         if not lvs.offline:
1298           self.LogWarning("Connection to node %s failed: %s" %
1299                           (node, lvs.data))
1300         continue
1301       lvs = lvs.data
1302       if isinstance(lvs, basestring):
1303         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1304         res_nlvm[node] = lvs
1305         continue
1306       elif not isinstance(lvs, dict):
1307         logging.warning("Connection to node %s failed or invalid data"
1308                         " returned", node)
1309         res_nodes.append(node)
1310         continue
1311
1312       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1313         inst = nv_dict.pop((node, lv_name), None)
1314         if (not lv_online and inst is not None
1315             and inst.name not in res_instances):
1316           res_instances.append(inst.name)
1317
1318     # any leftover items in nv_dict are missing LVs, let's arrange the
1319     # data better
1320     for key, inst in nv_dict.iteritems():
1321       if inst.name not in res_missing:
1322         res_missing[inst.name] = []
1323       res_missing[inst.name].append(key)
1324
1325     return result
1326
1327
1328 class LURepairDiskSizes(NoHooksLU):
1329   """Verifies the cluster disks sizes.
1330
1331   """
1332   _OP_REQP = ["instances"]
1333   REQ_BGL = False
1334
1335   def ExpandNames(self):
1336
1337     if not isinstance(self.op.instances, list):
1338       raise errors.OpPrereqError("Invalid argument type 'instances'")
1339
1340     if self.op.instances:
1341       self.wanted_names = []
1342       for name in self.op.instances:
1343         full_name = self.cfg.ExpandInstanceName(name)
1344         if full_name is None:
1345           raise errors.OpPrereqError("Instance '%s' not known" % name)
1346         self.wanted_names.append(full_name)
1347       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
1348       self.needed_locks = {
1349         locking.LEVEL_NODE: [],
1350         locking.LEVEL_INSTANCE: self.wanted_names,
1351         }
1352       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1353     else:
1354       self.wanted_names = None
1355       self.needed_locks = {
1356         locking.LEVEL_NODE: locking.ALL_SET,
1357         locking.LEVEL_INSTANCE: locking.ALL_SET,
1358         }
1359     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1360
1361   def DeclareLocks(self, level):
1362     if level == locking.LEVEL_NODE and self.wanted_names is not None:
1363       self._LockInstancesNodes(primary_only=True)
1364
1365   def CheckPrereq(self):
1366     """Check prerequisites.
1367
1368     This only checks the optional instance list against the existing names.
1369
1370     """
1371     if self.wanted_names is None:
1372       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1373
1374     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1375                              in self.wanted_names]
1376
1377   def Exec(self, feedback_fn):
1378     """Verify the size of cluster disks.
1379
1380     """
1381     # TODO: check child disks too
1382     # TODO: check differences in size between primary/secondary nodes
1383     per_node_disks = {}
1384     for instance in self.wanted_instances:
1385       pnode = instance.primary_node
1386       if pnode not in per_node_disks:
1387         per_node_disks[pnode] = []
1388       for idx, disk in enumerate(instance.disks):
1389         per_node_disks[pnode].append((instance, idx, disk))
1390
1391     changed = []
1392     for node, dskl in per_node_disks.items():
1393       result = self.rpc.call_blockdev_getsizes(node, [v[2] for v in dskl])
1394       if result.failed:
1395         self.LogWarning("Failure in blockdev_getsizes call to node"
1396                         " %s, ignoring", node)
1397         continue
1398       if len(result.data) != len(dskl):
1399         self.LogWarning("Invalid result from node %s, ignoring node results",
1400                         node)
1401         continue
1402       for ((instance, idx, disk), size) in zip(dskl, result.data):
1403         if size is None:
1404           self.LogWarning("Disk %d of instance %s did not return size"
1405                           " information, ignoring", idx, instance.name)
1406           continue
1407         if not isinstance(size, (int, long)):
1408           self.LogWarning("Disk %d of instance %s did not return valid"
1409                           " size information, ignoring", idx, instance.name)
1410           continue
1411         size = size >> 20
1412         if size != disk.size:
1413           self.LogInfo("Disk %d of instance %s has mismatched size,"
1414                        " correcting: recorded %d, actual %d", idx,
1415                        instance.name, disk.size, size)
1416           disk.size = size
1417           self.cfg.Update(instance)
1418           changed.append((instance.name, idx, size))
1419     return changed
1420
1421
1422 class LURenameCluster(LogicalUnit):
1423   """Rename the cluster.
1424
1425   """
1426   HPATH = "cluster-rename"
1427   HTYPE = constants.HTYPE_CLUSTER
1428   _OP_REQP = ["name"]
1429
1430   def BuildHooksEnv(self):
1431     """Build hooks env.
1432
1433     """
1434     env = {
1435       "OP_TARGET": self.cfg.GetClusterName(),
1436       "NEW_NAME": self.op.name,
1437       }
1438     mn = self.cfg.GetMasterNode()
1439     return env, [mn], [mn]
1440
1441   def CheckPrereq(self):
1442     """Verify that the passed name is a valid one.
1443
1444     """
1445     hostname = utils.HostInfo(self.op.name)
1446
1447     new_name = hostname.name
1448     self.ip = new_ip = hostname.ip
1449     old_name = self.cfg.GetClusterName()
1450     old_ip = self.cfg.GetMasterIP()
1451     if new_name == old_name and new_ip == old_ip:
1452       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1453                                  " cluster has changed")
1454     if new_ip != old_ip:
1455       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1456         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1457                                    " reachable on the network. Aborting." %
1458                                    new_ip)
1459
1460     self.op.name = new_name
1461
1462   def Exec(self, feedback_fn):
1463     """Rename the cluster.
1464
1465     """
1466     clustername = self.op.name
1467     ip = self.ip
1468
1469     # shutdown the master IP
1470     master = self.cfg.GetMasterNode()
1471     result = self.rpc.call_node_stop_master(master, False)
1472     if result.failed or not result.data:
1473       raise errors.OpExecError("Could not disable the master role")
1474
1475     try:
1476       cluster = self.cfg.GetClusterInfo()
1477       cluster.cluster_name = clustername
1478       cluster.master_ip = ip
1479       self.cfg.Update(cluster)
1480
1481       # update the known hosts file
1482       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1483       node_list = self.cfg.GetNodeList()
1484       try:
1485         node_list.remove(master)
1486       except ValueError:
1487         pass
1488       result = self.rpc.call_upload_file(node_list,
1489                                          constants.SSH_KNOWN_HOSTS_FILE)
1490       for to_node, to_result in result.iteritems():
1491         if to_result.failed or not to_result.data:
1492           logging.error("Copy of file %s to node %s failed",
1493                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1494
1495     finally:
1496       result = self.rpc.call_node_start_master(master, False, False)
1497       if result.failed or not result.data:
1498         self.LogWarning("Could not re-enable the master role on"
1499                         " the master, please restart manually.")
1500
1501
1502 def _RecursiveCheckIfLVMBased(disk):
1503   """Check if the given disk or its children are lvm-based.
1504
1505   @type disk: L{objects.Disk}
1506   @param disk: the disk to check
1507   @rtype: boolean
1508   @return: boolean indicating whether a LD_LV dev_type was found or not
1509
1510   """
1511   if disk.children:
1512     for chdisk in disk.children:
1513       if _RecursiveCheckIfLVMBased(chdisk):
1514         return True
1515   return disk.dev_type == constants.LD_LV
1516
1517
1518 class LUSetClusterParams(LogicalUnit):
1519   """Change the parameters of the cluster.
1520
1521   """
1522   HPATH = "cluster-modify"
1523   HTYPE = constants.HTYPE_CLUSTER
1524   _OP_REQP = []
1525   REQ_BGL = False
1526
1527   def CheckArguments(self):
1528     """Check parameters
1529
1530     """
1531     if not hasattr(self.op, "candidate_pool_size"):
1532       self.op.candidate_pool_size = None
1533     if self.op.candidate_pool_size is not None:
1534       try:
1535         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1536       except (ValueError, TypeError), err:
1537         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1538                                    str(err))
1539       if self.op.candidate_pool_size < 1:
1540         raise errors.OpPrereqError("At least one master candidate needed")
1541
1542   def ExpandNames(self):
1543     # FIXME: in the future maybe other cluster params won't require checking on
1544     # all nodes to be modified.
1545     self.needed_locks = {
1546       locking.LEVEL_NODE: locking.ALL_SET,
1547     }
1548     self.share_locks[locking.LEVEL_NODE] = 1
1549
1550   def BuildHooksEnv(self):
1551     """Build hooks env.
1552
1553     """
1554     env = {
1555       "OP_TARGET": self.cfg.GetClusterName(),
1556       "NEW_VG_NAME": self.op.vg_name,
1557       }
1558     mn = self.cfg.GetMasterNode()
1559     return env, [mn], [mn]
1560
1561   def CheckPrereq(self):
1562     """Check prerequisites.
1563
1564     This checks whether the given params don't conflict and
1565     if the given volume group is valid.
1566
1567     """
1568     if self.op.vg_name is not None and not self.op.vg_name:
1569       instances = self.cfg.GetAllInstancesInfo().values()
1570       for inst in instances:
1571         for disk in inst.disks:
1572           if _RecursiveCheckIfLVMBased(disk):
1573             raise errors.OpPrereqError("Cannot disable lvm storage while"
1574                                        " lvm-based instances exist")
1575
1576     node_list = self.acquired_locks[locking.LEVEL_NODE]
1577
1578     # if vg_name not None, checks given volume group on all nodes
1579     if self.op.vg_name:
1580       vglist = self.rpc.call_vg_list(node_list)
1581       for node in node_list:
1582         if vglist[node].failed:
1583           # ignoring down node
1584           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1585           continue
1586         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1587                                               self.op.vg_name,
1588                                               constants.MIN_VG_SIZE)
1589         if vgstatus:
1590           raise errors.OpPrereqError("Error on node '%s': %s" %
1591                                      (node, vgstatus))
1592
1593     self.cluster = cluster = self.cfg.GetClusterInfo()
1594     # validate beparams changes
1595     if self.op.beparams:
1596       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1597       self.new_beparams = cluster.FillDict(
1598         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1599
1600     # hypervisor list/parameters
1601     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1602     if self.op.hvparams:
1603       if not isinstance(self.op.hvparams, dict):
1604         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1605       for hv_name, hv_dict in self.op.hvparams.items():
1606         if hv_name not in self.new_hvparams:
1607           self.new_hvparams[hv_name] = hv_dict
1608         else:
1609           self.new_hvparams[hv_name].update(hv_dict)
1610
1611     if self.op.enabled_hypervisors is not None:
1612       self.hv_list = self.op.enabled_hypervisors
1613       if not self.hv_list:
1614         raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1615                                    " least one member")
1616       invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1617       if invalid_hvs:
1618         raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1619                                    " entries: %s" % invalid_hvs)
1620     else:
1621       self.hv_list = cluster.enabled_hypervisors
1622
1623     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1624       # either the enabled list has changed, or the parameters have, validate
1625       for hv_name, hv_params in self.new_hvparams.items():
1626         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1627             (self.op.enabled_hypervisors and
1628              hv_name in self.op.enabled_hypervisors)):
1629           # either this is a new hypervisor, or its parameters have changed
1630           hv_class = hypervisor.GetHypervisor(hv_name)
1631           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1632           hv_class.CheckParameterSyntax(hv_params)
1633           _CheckHVParams(self, node_list, hv_name, hv_params)
1634
1635   def Exec(self, feedback_fn):
1636     """Change the parameters of the cluster.
1637
1638     """
1639     if self.op.vg_name is not None:
1640       new_volume = self.op.vg_name
1641       if not new_volume:
1642         new_volume = None
1643       if new_volume != self.cfg.GetVGName():
1644         self.cfg.SetVGName(new_volume)
1645       else:
1646         feedback_fn("Cluster LVM configuration already in desired"
1647                     " state, not changing")
1648     if self.op.hvparams:
1649       self.cluster.hvparams = self.new_hvparams
1650     if self.op.enabled_hypervisors is not None:
1651       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1652     if self.op.beparams:
1653       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1654     if self.op.candidate_pool_size is not None:
1655       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1656       # we need to update the pool size here, otherwise the save will fail
1657       _AdjustCandidatePool(self)
1658
1659     self.cfg.Update(self.cluster)
1660
1661
1662 class LURedistributeConfig(NoHooksLU):
1663   """Force the redistribution of cluster configuration.
1664
1665   This is a very simple LU.
1666
1667   """
1668   _OP_REQP = []
1669   REQ_BGL = False
1670
1671   def ExpandNames(self):
1672     self.needed_locks = {
1673       locking.LEVEL_NODE: locking.ALL_SET,
1674     }
1675     self.share_locks[locking.LEVEL_NODE] = 1
1676
1677   def CheckPrereq(self):
1678     """Check prerequisites.
1679
1680     """
1681
1682   def Exec(self, feedback_fn):
1683     """Redistribute the configuration.
1684
1685     """
1686     self.cfg.Update(self.cfg.GetClusterInfo())
1687
1688
1689 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1690   """Sleep and poll for an instance's disk to sync.
1691
1692   """
1693   if not instance.disks:
1694     return True
1695
1696   if not oneshot:
1697     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1698
1699   node = instance.primary_node
1700
1701   for dev in instance.disks:
1702     lu.cfg.SetDiskID(dev, node)
1703
1704   retries = 0
1705   degr_retries = 10 # in seconds, as we sleep 1 second each time
1706   while True:
1707     max_time = 0
1708     done = True
1709     cumul_degraded = False
1710     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1711     if rstats.failed or not rstats.data:
1712       lu.LogWarning("Can't get any data from node %s", node)
1713       retries += 1
1714       if retries >= 10:
1715         raise errors.RemoteError("Can't contact node %s for mirror data,"
1716                                  " aborting." % node)
1717       time.sleep(6)
1718       continue
1719     rstats = rstats.data
1720     retries = 0
1721     for i, mstat in enumerate(rstats):
1722       if mstat is None:
1723         lu.LogWarning("Can't compute data for node %s/%s",
1724                            node, instance.disks[i].iv_name)
1725         continue
1726       # we ignore the ldisk parameter
1727       perc_done, est_time, is_degraded, _ = mstat
1728       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1729       if perc_done is not None:
1730         done = False
1731         if est_time is not None:
1732           rem_time = "%d estimated seconds remaining" % est_time
1733           max_time = est_time
1734         else:
1735           rem_time = "no time estimate"
1736         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1737                         (instance.disks[i].iv_name, perc_done, rem_time))
1738
1739     # if we're done but degraded, let's do a few small retries, to
1740     # make sure we see a stable and not transient situation; therefore
1741     # we force restart of the loop
1742     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1743       logging.info("Degraded disks found, %d retries left", degr_retries)
1744       degr_retries -= 1
1745       time.sleep(1)
1746       continue
1747
1748     if done or oneshot:
1749       break
1750
1751     time.sleep(min(60, max_time))
1752
1753   if done:
1754     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1755   return not cumul_degraded
1756
1757
1758 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1759   """Check that mirrors are not degraded.
1760
1761   The ldisk parameter, if True, will change the test from the
1762   is_degraded attribute (which represents overall non-ok status for
1763   the device(s)) to the ldisk (representing the local storage status).
1764
1765   """
1766   lu.cfg.SetDiskID(dev, node)
1767   if ldisk:
1768     idx = 6
1769   else:
1770     idx = 5
1771
1772   result = True
1773   if on_primary or dev.AssembleOnSecondary():
1774     rstats = lu.rpc.call_blockdev_find(node, dev)
1775     msg = rstats.RemoteFailMsg()
1776     if msg:
1777       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1778       result = False
1779     elif not rstats.payload:
1780       lu.LogWarning("Can't find disk on node %s", node)
1781       result = False
1782     else:
1783       result = result and (not rstats.payload[idx])
1784   if dev.children:
1785     for child in dev.children:
1786       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1787
1788   return result
1789
1790
1791 class LUDiagnoseOS(NoHooksLU):
1792   """Logical unit for OS diagnose/query.
1793
1794   """
1795   _OP_REQP = ["output_fields", "names"]
1796   REQ_BGL = False
1797   _FIELDS_STATIC = utils.FieldSet()
1798   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1799
1800   def ExpandNames(self):
1801     if self.op.names:
1802       raise errors.OpPrereqError("Selective OS query not supported")
1803
1804     _CheckOutputFields(static=self._FIELDS_STATIC,
1805                        dynamic=self._FIELDS_DYNAMIC,
1806                        selected=self.op.output_fields)
1807
1808     # Lock all nodes, in shared mode
1809     # Temporary removal of locks, should be reverted later
1810     # TODO: reintroduce locks when they are lighter-weight
1811     self.needed_locks = {}
1812     #self.share_locks[locking.LEVEL_NODE] = 1
1813     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1814
1815   def CheckPrereq(self):
1816     """Check prerequisites.
1817
1818     """
1819
1820   @staticmethod
1821   def _DiagnoseByOS(node_list, rlist):
1822     """Remaps a per-node return list into an a per-os per-node dictionary
1823
1824     @param node_list: a list with the names of all nodes
1825     @param rlist: a map with node names as keys and OS objects as values
1826
1827     @rtype: dict
1828     @return: a dictionary with osnames as keys and as value another map, with
1829         nodes as keys and list of OS objects as values, eg::
1830
1831           {"debian-etch": {"node1": [<object>,...],
1832                            "node2": [<object>,]}
1833           }
1834
1835     """
1836     all_os = {}
1837     # we build here the list of nodes that didn't fail the RPC (at RPC
1838     # level), so that nodes with a non-responding node daemon don't
1839     # make all OSes invalid
1840     good_nodes = [node_name for node_name in rlist
1841                   if not rlist[node_name].failed]
1842     for node_name, nr in rlist.iteritems():
1843       if nr.failed or not nr.data:
1844         continue
1845       for os_obj in nr.data:
1846         if os_obj.name not in all_os:
1847           # build a list of nodes for this os containing empty lists
1848           # for each node in node_list
1849           all_os[os_obj.name] = {}
1850           for nname in good_nodes:
1851             all_os[os_obj.name][nname] = []
1852         all_os[os_obj.name][node_name].append(os_obj)
1853     return all_os
1854
1855   def Exec(self, feedback_fn):
1856     """Compute the list of OSes.
1857
1858     """
1859     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1860     node_data = self.rpc.call_os_diagnose(valid_nodes)
1861     if node_data == False:
1862       raise errors.OpExecError("Can't gather the list of OSes")
1863     pol = self._DiagnoseByOS(valid_nodes, node_data)
1864     output = []
1865     for os_name, os_data in pol.iteritems():
1866       row = []
1867       for field in self.op.output_fields:
1868         if field == "name":
1869           val = os_name
1870         elif field == "valid":
1871           val = utils.all([osl and osl[0] for osl in os_data.values()])
1872         elif field == "node_status":
1873           val = {}
1874           for node_name, nos_list in os_data.iteritems():
1875             val[node_name] = [(v.status, v.path) for v in nos_list]
1876         else:
1877           raise errors.ParameterError(field)
1878         row.append(val)
1879       output.append(row)
1880
1881     return output
1882
1883
1884 class LURemoveNode(LogicalUnit):
1885   """Logical unit for removing a node.
1886
1887   """
1888   HPATH = "node-remove"
1889   HTYPE = constants.HTYPE_NODE
1890   _OP_REQP = ["node_name"]
1891
1892   def BuildHooksEnv(self):
1893     """Build hooks env.
1894
1895     This doesn't run on the target node in the pre phase as a failed
1896     node would then be impossible to remove.
1897
1898     """
1899     env = {
1900       "OP_TARGET": self.op.node_name,
1901       "NODE_NAME": self.op.node_name,
1902       }
1903     all_nodes = self.cfg.GetNodeList()
1904     all_nodes.remove(self.op.node_name)
1905     return env, all_nodes, all_nodes
1906
1907   def CheckPrereq(self):
1908     """Check prerequisites.
1909
1910     This checks:
1911      - the node exists in the configuration
1912      - it does not have primary or secondary instances
1913      - it's not the master
1914
1915     Any errors are signaled by raising errors.OpPrereqError.
1916
1917     """
1918     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1919     if node is None:
1920       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1921
1922     instance_list = self.cfg.GetInstanceList()
1923
1924     masternode = self.cfg.GetMasterNode()
1925     if node.name == masternode:
1926       raise errors.OpPrereqError("Node is the master node,"
1927                                  " you need to failover first.")
1928
1929     for instance_name in instance_list:
1930       instance = self.cfg.GetInstanceInfo(instance_name)
1931       if node.name in instance.all_nodes:
1932         raise errors.OpPrereqError("Instance %s is still running on the node,"
1933                                    " please remove first." % instance_name)
1934     self.op.node_name = node.name
1935     self.node = node
1936
1937   def Exec(self, feedback_fn):
1938     """Removes the node from the cluster.
1939
1940     """
1941     node = self.node
1942     logging.info("Stopping the node daemon and removing configs from node %s",
1943                  node.name)
1944
1945     self.context.RemoveNode(node.name)
1946
1947     self.rpc.call_node_leave_cluster(node.name)
1948
1949     # Promote nodes to master candidate as needed
1950     _AdjustCandidatePool(self)
1951
1952
1953 class LUQueryNodes(NoHooksLU):
1954   """Logical unit for querying nodes.
1955
1956   """
1957   _OP_REQP = ["output_fields", "names", "use_locking"]
1958   REQ_BGL = False
1959   _FIELDS_DYNAMIC = utils.FieldSet(
1960     "dtotal", "dfree",
1961     "mtotal", "mnode", "mfree",
1962     "bootid",
1963     "ctotal", "cnodes", "csockets",
1964     )
1965
1966   _FIELDS_STATIC = utils.FieldSet(
1967     "name", "pinst_cnt", "sinst_cnt",
1968     "pinst_list", "sinst_list",
1969     "pip", "sip", "tags",
1970     "serial_no",
1971     "master_candidate",
1972     "master",
1973     "offline",
1974     "drained",
1975     "role",
1976     )
1977
1978   def ExpandNames(self):
1979     _CheckOutputFields(static=self._FIELDS_STATIC,
1980                        dynamic=self._FIELDS_DYNAMIC,
1981                        selected=self.op.output_fields)
1982
1983     self.needed_locks = {}
1984     self.share_locks[locking.LEVEL_NODE] = 1
1985
1986     if self.op.names:
1987       self.wanted = _GetWantedNodes(self, self.op.names)
1988     else:
1989       self.wanted = locking.ALL_SET
1990
1991     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1992     self.do_locking = self.do_node_query and self.op.use_locking
1993     if self.do_locking:
1994       # if we don't request only static fields, we need to lock the nodes
1995       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1996
1997
1998   def CheckPrereq(self):
1999     """Check prerequisites.
2000
2001     """
2002     # The validation of the node list is done in the _GetWantedNodes,
2003     # if non empty, and if empty, there's no validation to do
2004     pass
2005
2006   def Exec(self, feedback_fn):
2007     """Computes the list of nodes and their attributes.
2008
2009     """
2010     all_info = self.cfg.GetAllNodesInfo()
2011     if self.do_locking:
2012       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2013     elif self.wanted != locking.ALL_SET:
2014       nodenames = self.wanted
2015       missing = set(nodenames).difference(all_info.keys())
2016       if missing:
2017         raise errors.OpExecError(
2018           "Some nodes were removed before retrieving their data: %s" % missing)
2019     else:
2020       nodenames = all_info.keys()
2021
2022     nodenames = utils.NiceSort(nodenames)
2023     nodelist = [all_info[name] for name in nodenames]
2024
2025     # begin data gathering
2026
2027     if self.do_node_query:
2028       live_data = {}
2029       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2030                                           self.cfg.GetHypervisorType())
2031       for name in nodenames:
2032         nodeinfo = node_data[name]
2033         if not nodeinfo.failed and nodeinfo.data:
2034           nodeinfo = nodeinfo.data
2035           fn = utils.TryConvert
2036           live_data[name] = {
2037             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2038             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2039             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2040             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2041             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2042             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2043             "bootid": nodeinfo.get('bootid', None),
2044             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2045             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2046             }
2047         else:
2048           live_data[name] = {}
2049     else:
2050       live_data = dict.fromkeys(nodenames, {})
2051
2052     node_to_primary = dict([(name, set()) for name in nodenames])
2053     node_to_secondary = dict([(name, set()) for name in nodenames])
2054
2055     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2056                              "sinst_cnt", "sinst_list"))
2057     if inst_fields & frozenset(self.op.output_fields):
2058       instancelist = self.cfg.GetInstanceList()
2059
2060       for instance_name in instancelist:
2061         inst = self.cfg.GetInstanceInfo(instance_name)
2062         if inst.primary_node in node_to_primary:
2063           node_to_primary[inst.primary_node].add(inst.name)
2064         for secnode in inst.secondary_nodes:
2065           if secnode in node_to_secondary:
2066             node_to_secondary[secnode].add(inst.name)
2067
2068     master_node = self.cfg.GetMasterNode()
2069
2070     # end data gathering
2071
2072     output = []
2073     for node in nodelist:
2074       node_output = []
2075       for field in self.op.output_fields:
2076         if field == "name":
2077           val = node.name
2078         elif field == "pinst_list":
2079           val = list(node_to_primary[node.name])
2080         elif field == "sinst_list":
2081           val = list(node_to_secondary[node.name])
2082         elif field == "pinst_cnt":
2083           val = len(node_to_primary[node.name])
2084         elif field == "sinst_cnt":
2085           val = len(node_to_secondary[node.name])
2086         elif field == "pip":
2087           val = node.primary_ip
2088         elif field == "sip":
2089           val = node.secondary_ip
2090         elif field == "tags":
2091           val = list(node.GetTags())
2092         elif field == "serial_no":
2093           val = node.serial_no
2094         elif field == "master_candidate":
2095           val = node.master_candidate
2096         elif field == "master":
2097           val = node.name == master_node
2098         elif field == "offline":
2099           val = node.offline
2100         elif field == "drained":
2101           val = node.drained
2102         elif self._FIELDS_DYNAMIC.Matches(field):
2103           val = live_data[node.name].get(field, None)
2104         elif field == "role":
2105           if node.name == master_node:
2106             val = "M"
2107           elif node.master_candidate:
2108             val = "C"
2109           elif node.drained:
2110             val = "D"
2111           elif node.offline:
2112             val = "O"
2113           else:
2114             val = "R"
2115         else:
2116           raise errors.ParameterError(field)
2117         node_output.append(val)
2118       output.append(node_output)
2119
2120     return output
2121
2122
2123 class LUQueryNodeVolumes(NoHooksLU):
2124   """Logical unit for getting volumes on node(s).
2125
2126   """
2127   _OP_REQP = ["nodes", "output_fields"]
2128   REQ_BGL = False
2129   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2130   _FIELDS_STATIC = utils.FieldSet("node")
2131
2132   def ExpandNames(self):
2133     _CheckOutputFields(static=self._FIELDS_STATIC,
2134                        dynamic=self._FIELDS_DYNAMIC,
2135                        selected=self.op.output_fields)
2136
2137     self.needed_locks = {}
2138     self.share_locks[locking.LEVEL_NODE] = 1
2139     if not self.op.nodes:
2140       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2141     else:
2142       self.needed_locks[locking.LEVEL_NODE] = \
2143         _GetWantedNodes(self, self.op.nodes)
2144
2145   def CheckPrereq(self):
2146     """Check prerequisites.
2147
2148     This checks that the fields required are valid output fields.
2149
2150     """
2151     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2152
2153   def Exec(self, feedback_fn):
2154     """Computes the list of nodes and their attributes.
2155
2156     """
2157     nodenames = self.nodes
2158     volumes = self.rpc.call_node_volumes(nodenames)
2159
2160     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2161              in self.cfg.GetInstanceList()]
2162
2163     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2164
2165     output = []
2166     for node in nodenames:
2167       if node not in volumes or volumes[node].failed or not volumes[node].data:
2168         continue
2169
2170       node_vols = volumes[node].data[:]
2171       node_vols.sort(key=lambda vol: vol['dev'])
2172
2173       for vol in node_vols:
2174         node_output = []
2175         for field in self.op.output_fields:
2176           if field == "node":
2177             val = node
2178           elif field == "phys":
2179             val = vol['dev']
2180           elif field == "vg":
2181             val = vol['vg']
2182           elif field == "name":
2183             val = vol['name']
2184           elif field == "size":
2185             val = int(float(vol['size']))
2186           elif field == "instance":
2187             for inst in ilist:
2188               if node not in lv_by_node[inst]:
2189                 continue
2190               if vol['name'] in lv_by_node[inst][node]:
2191                 val = inst.name
2192                 break
2193             else:
2194               val = '-'
2195           else:
2196             raise errors.ParameterError(field)
2197           node_output.append(str(val))
2198
2199         output.append(node_output)
2200
2201     return output
2202
2203
2204 class LUAddNode(LogicalUnit):
2205   """Logical unit for adding node to the cluster.
2206
2207   """
2208   HPATH = "node-add"
2209   HTYPE = constants.HTYPE_NODE
2210   _OP_REQP = ["node_name"]
2211
2212   def BuildHooksEnv(self):
2213     """Build hooks env.
2214
2215     This will run on all nodes before, and on all nodes + the new node after.
2216
2217     """
2218     env = {
2219       "OP_TARGET": self.op.node_name,
2220       "NODE_NAME": self.op.node_name,
2221       "NODE_PIP": self.op.primary_ip,
2222       "NODE_SIP": self.op.secondary_ip,
2223       }
2224     nodes_0 = self.cfg.GetNodeList()
2225     nodes_1 = nodes_0 + [self.op.node_name, ]
2226     return env, nodes_0, nodes_1
2227
2228   def CheckPrereq(self):
2229     """Check prerequisites.
2230
2231     This checks:
2232      - the new node is not already in the config
2233      - it is resolvable
2234      - its parameters (single/dual homed) matches the cluster
2235
2236     Any errors are signaled by raising errors.OpPrereqError.
2237
2238     """
2239     node_name = self.op.node_name
2240     cfg = self.cfg
2241
2242     dns_data = utils.HostInfo(node_name)
2243
2244     node = dns_data.name
2245     primary_ip = self.op.primary_ip = dns_data.ip
2246     secondary_ip = getattr(self.op, "secondary_ip", None)
2247     if secondary_ip is None:
2248       secondary_ip = primary_ip
2249     if not utils.IsValidIP(secondary_ip):
2250       raise errors.OpPrereqError("Invalid secondary IP given")
2251     self.op.secondary_ip = secondary_ip
2252
2253     node_list = cfg.GetNodeList()
2254     if not self.op.readd and node in node_list:
2255       raise errors.OpPrereqError("Node %s is already in the configuration" %
2256                                  node)
2257     elif self.op.readd and node not in node_list:
2258       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2259
2260     for existing_node_name in node_list:
2261       existing_node = cfg.GetNodeInfo(existing_node_name)
2262
2263       if self.op.readd and node == existing_node_name:
2264         if (existing_node.primary_ip != primary_ip or
2265             existing_node.secondary_ip != secondary_ip):
2266           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2267                                      " address configuration as before")
2268         continue
2269
2270       if (existing_node.primary_ip == primary_ip or
2271           existing_node.secondary_ip == primary_ip or
2272           existing_node.primary_ip == secondary_ip or
2273           existing_node.secondary_ip == secondary_ip):
2274         raise errors.OpPrereqError("New node ip address(es) conflict with"
2275                                    " existing node %s" % existing_node.name)
2276
2277     # check that the type of the node (single versus dual homed) is the
2278     # same as for the master
2279     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2280     master_singlehomed = myself.secondary_ip == myself.primary_ip
2281     newbie_singlehomed = secondary_ip == primary_ip
2282     if master_singlehomed != newbie_singlehomed:
2283       if master_singlehomed:
2284         raise errors.OpPrereqError("The master has no private ip but the"
2285                                    " new node has one")
2286       else:
2287         raise errors.OpPrereqError("The master has a private ip but the"
2288                                    " new node doesn't have one")
2289
2290     # checks reachability
2291     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2292       raise errors.OpPrereqError("Node not reachable by ping")
2293
2294     if not newbie_singlehomed:
2295       # check reachability from my secondary ip to newbie's secondary ip
2296       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2297                            source=myself.secondary_ip):
2298         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2299                                    " based ping to noded port")
2300
2301     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2302     if self.op.readd:
2303       exceptions = [node]
2304     else:
2305       exceptions = []
2306     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2307     # the new node will increase mc_max with one, so:
2308     mc_max = min(mc_max + 1, cp_size)
2309     self.master_candidate = mc_now < mc_max
2310
2311     if self.op.readd:
2312       self.new_node = self.cfg.GetNodeInfo(node)
2313       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2314     else:
2315       self.new_node = objects.Node(name=node,
2316                                    primary_ip=primary_ip,
2317                                    secondary_ip=secondary_ip,
2318                                    master_candidate=self.master_candidate,
2319                                    offline=False, drained=False)
2320
2321   def Exec(self, feedback_fn):
2322     """Adds the new node to the cluster.
2323
2324     """
2325     new_node = self.new_node
2326     node = new_node.name
2327
2328     # for re-adds, reset the offline/drained/master-candidate flags;
2329     # we need to reset here, otherwise offline would prevent RPC calls
2330     # later in the procedure; this also means that if the re-add
2331     # fails, we are left with a non-offlined, broken node
2332     if self.op.readd:
2333       new_node.drained = new_node.offline = False
2334       self.LogInfo("Readding a node, the offline/drained flags were reset")
2335       # if we demote the node, we do cleanup later in the procedure
2336       new_node.master_candidate = self.master_candidate
2337
2338     # notify the user about any possible mc promotion
2339     if new_node.master_candidate:
2340       self.LogInfo("Node will be a master candidate")
2341
2342     # check connectivity
2343     result = self.rpc.call_version([node])[node]
2344     result.Raise()
2345     if result.data:
2346       if constants.PROTOCOL_VERSION == result.data:
2347         logging.info("Communication to node %s fine, sw version %s match",
2348                      node, result.data)
2349       else:
2350         raise errors.OpExecError("Version mismatch master version %s,"
2351                                  " node version %s" %
2352                                  (constants.PROTOCOL_VERSION, result.data))
2353     else:
2354       raise errors.OpExecError("Cannot get version from the new node")
2355
2356     # setup ssh on node
2357     logging.info("Copy ssh key to node %s", node)
2358     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2359     keyarray = []
2360     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2361                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2362                 priv_key, pub_key]
2363
2364     for i in keyfiles:
2365       f = open(i, 'r')
2366       try:
2367         keyarray.append(f.read())
2368       finally:
2369         f.close()
2370
2371     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2372                                     keyarray[2],
2373                                     keyarray[3], keyarray[4], keyarray[5])
2374
2375     msg = result.RemoteFailMsg()
2376     if msg:
2377       raise errors.OpExecError("Cannot transfer ssh keys to the"
2378                                " new node: %s" % msg)
2379
2380     # Add node to our /etc/hosts, and add key to known_hosts
2381     if self.cfg.GetClusterInfo().modify_etc_hosts:
2382       utils.AddHostToEtcHosts(new_node.name)
2383
2384     if new_node.secondary_ip != new_node.primary_ip:
2385       result = self.rpc.call_node_has_ip_address(new_node.name,
2386                                                  new_node.secondary_ip)
2387       if result.failed or not result.data:
2388         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2389                                  " you gave (%s). Please fix and re-run this"
2390                                  " command." % new_node.secondary_ip)
2391
2392     node_verify_list = [self.cfg.GetMasterNode()]
2393     node_verify_param = {
2394       'nodelist': [node],
2395       # TODO: do a node-net-test as well?
2396     }
2397
2398     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2399                                        self.cfg.GetClusterName())
2400     for verifier in node_verify_list:
2401       if result[verifier].failed or not result[verifier].data:
2402         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2403                                  " for remote verification" % verifier)
2404       if result[verifier].data['nodelist']:
2405         for failed in result[verifier].data['nodelist']:
2406           feedback_fn("ssh/hostname verification failed %s -> %s" %
2407                       (verifier, result[verifier].data['nodelist'][failed]))
2408         raise errors.OpExecError("ssh/hostname verification failed.")
2409
2410     # Distribute updated /etc/hosts and known_hosts to all nodes,
2411     # including the node just added
2412     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2413     dist_nodes = self.cfg.GetNodeList()
2414     if not self.op.readd:
2415       dist_nodes.append(node)
2416     if myself.name in dist_nodes:
2417       dist_nodes.remove(myself.name)
2418
2419     logging.debug("Copying hosts and known_hosts to all nodes")
2420     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2421       result = self.rpc.call_upload_file(dist_nodes, fname)
2422       for to_node, to_result in result.iteritems():
2423         if to_result.failed or not to_result.data:
2424           logging.error("Copy of file %s to node %s failed", fname, to_node)
2425
2426     to_copy = []
2427     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2428     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2429       to_copy.append(constants.VNC_PASSWORD_FILE)
2430
2431     for fname in to_copy:
2432       result = self.rpc.call_upload_file([node], fname)
2433       if result[node].failed or not result[node]:
2434         logging.error("Could not copy file %s to node %s", fname, node)
2435
2436     if self.op.readd:
2437       self.context.ReaddNode(new_node)
2438       # make sure we redistribute the config
2439       self.cfg.Update(new_node)
2440       # and make sure the new node will not have old files around
2441       if not new_node.master_candidate:
2442         result = self.rpc.call_node_demote_from_mc(new_node.name)
2443         msg = result.RemoteFailMsg()
2444         if msg:
2445           self.LogWarning("Node failed to demote itself from master"
2446                           " candidate status: %s" % msg)
2447     else:
2448       self.context.AddNode(new_node)
2449
2450
2451 class LUSetNodeParams(LogicalUnit):
2452   """Modifies the parameters of a node.
2453
2454   """
2455   HPATH = "node-modify"
2456   HTYPE = constants.HTYPE_NODE
2457   _OP_REQP = ["node_name"]
2458   REQ_BGL = False
2459
2460   def CheckArguments(self):
2461     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2462     if node_name is None:
2463       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2464     self.op.node_name = node_name
2465     _CheckBooleanOpField(self.op, 'master_candidate')
2466     _CheckBooleanOpField(self.op, 'offline')
2467     _CheckBooleanOpField(self.op, 'drained')
2468     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2469     if all_mods.count(None) == 3:
2470       raise errors.OpPrereqError("Please pass at least one modification")
2471     if all_mods.count(True) > 1:
2472       raise errors.OpPrereqError("Can't set the node into more than one"
2473                                  " state at the same time")
2474
2475   def ExpandNames(self):
2476     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2477
2478   def BuildHooksEnv(self):
2479     """Build hooks env.
2480
2481     This runs on the master node.
2482
2483     """
2484     env = {
2485       "OP_TARGET": self.op.node_name,
2486       "MASTER_CANDIDATE": str(self.op.master_candidate),
2487       "OFFLINE": str(self.op.offline),
2488       "DRAINED": str(self.op.drained),
2489       }
2490     nl = [self.cfg.GetMasterNode(),
2491           self.op.node_name]
2492     return env, nl, nl
2493
2494   def CheckPrereq(self):
2495     """Check prerequisites.
2496
2497     This only checks the instance list against the existing names.
2498
2499     """
2500     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2501
2502     if (self.op.master_candidate is not None or
2503         self.op.drained is not None or
2504         self.op.offline is not None):
2505       # we can't change the master's node flags
2506       if self.op.node_name == self.cfg.GetMasterNode():
2507         raise errors.OpPrereqError("The master role can be changed"
2508                                    " only via masterfailover")
2509
2510     if ((self.op.master_candidate == False or self.op.offline == True or
2511          self.op.drained == True) and node.master_candidate):
2512       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2513       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2514       if num_candidates <= cp_size:
2515         msg = ("Not enough master candidates (desired"
2516                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2517         if self.op.force:
2518           self.LogWarning(msg)
2519         else:
2520           raise errors.OpPrereqError(msg)
2521
2522     if (self.op.master_candidate == True and
2523         ((node.offline and not self.op.offline == False) or
2524          (node.drained and not self.op.drained == False))):
2525       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2526                                  " to master_candidate" % node.name)
2527
2528     return
2529
2530   def Exec(self, feedback_fn):
2531     """Modifies a node.
2532
2533     """
2534     node = self.node
2535
2536     result = []
2537     changed_mc = False
2538
2539     if self.op.offline is not None:
2540       node.offline = self.op.offline
2541       result.append(("offline", str(self.op.offline)))
2542       if self.op.offline == True:
2543         if node.master_candidate:
2544           node.master_candidate = False
2545           changed_mc = True
2546           result.append(("master_candidate", "auto-demotion due to offline"))
2547         if node.drained:
2548           node.drained = False
2549           result.append(("drained", "clear drained status due to offline"))
2550
2551     if self.op.master_candidate is not None:
2552       node.master_candidate = self.op.master_candidate
2553       changed_mc = True
2554       result.append(("master_candidate", str(self.op.master_candidate)))
2555       if self.op.master_candidate == False:
2556         rrc = self.rpc.call_node_demote_from_mc(node.name)
2557         msg = rrc.RemoteFailMsg()
2558         if msg:
2559           self.LogWarning("Node failed to demote itself: %s" % msg)
2560
2561     if self.op.drained is not None:
2562       node.drained = self.op.drained
2563       result.append(("drained", str(self.op.drained)))
2564       if self.op.drained == True:
2565         if node.master_candidate:
2566           node.master_candidate = False
2567           changed_mc = True
2568           result.append(("master_candidate", "auto-demotion due to drain"))
2569           rrc = self.rpc.call_node_demote_from_mc(node.name)
2570           msg = rrc.RemoteFailMsg()
2571           if msg:
2572             self.LogWarning("Node failed to demote itself: %s" % msg)
2573         if node.offline:
2574           node.offline = False
2575           result.append(("offline", "clear offline status due to drain"))
2576
2577     # this will trigger configuration file update, if needed
2578     self.cfg.Update(node)
2579     # this will trigger job queue propagation or cleanup
2580     if changed_mc:
2581       self.context.ReaddNode(node)
2582
2583     return result
2584
2585
2586 class LUQueryClusterInfo(NoHooksLU):
2587   """Query cluster configuration.
2588
2589   """
2590   _OP_REQP = []
2591   REQ_BGL = False
2592
2593   def ExpandNames(self):
2594     self.needed_locks = {}
2595
2596   def CheckPrereq(self):
2597     """No prerequsites needed for this LU.
2598
2599     """
2600     pass
2601
2602   def Exec(self, feedback_fn):
2603     """Return cluster config.
2604
2605     """
2606     cluster = self.cfg.GetClusterInfo()
2607     result = {
2608       "software_version": constants.RELEASE_VERSION,
2609       "protocol_version": constants.PROTOCOL_VERSION,
2610       "config_version": constants.CONFIG_VERSION,
2611       "os_api_version": constants.OS_API_VERSION,
2612       "export_version": constants.EXPORT_VERSION,
2613       "architecture": (platform.architecture()[0], platform.machine()),
2614       "name": cluster.cluster_name,
2615       "master": cluster.master_node,
2616       "default_hypervisor": cluster.default_hypervisor,
2617       "enabled_hypervisors": cluster.enabled_hypervisors,
2618       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2619                         for hypervisor_name in cluster.enabled_hypervisors]),
2620       "beparams": cluster.beparams,
2621       "candidate_pool_size": cluster.candidate_pool_size,
2622       "default_bridge": cluster.default_bridge,
2623       "master_netdev": cluster.master_netdev,
2624       "volume_group_name": cluster.volume_group_name,
2625       "file_storage_dir": cluster.file_storage_dir,
2626       "tags": list(cluster.GetTags()),
2627       }
2628
2629     return result
2630
2631
2632 class LUQueryConfigValues(NoHooksLU):
2633   """Return configuration values.
2634
2635   """
2636   _OP_REQP = []
2637   REQ_BGL = False
2638   _FIELDS_DYNAMIC = utils.FieldSet()
2639   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2640
2641   def ExpandNames(self):
2642     self.needed_locks = {}
2643
2644     _CheckOutputFields(static=self._FIELDS_STATIC,
2645                        dynamic=self._FIELDS_DYNAMIC,
2646                        selected=self.op.output_fields)
2647
2648   def CheckPrereq(self):
2649     """No prerequisites.
2650
2651     """
2652     pass
2653
2654   def Exec(self, feedback_fn):
2655     """Dump a representation of the cluster config to the standard output.
2656
2657     """
2658     values = []
2659     for field in self.op.output_fields:
2660       if field == "cluster_name":
2661         entry = self.cfg.GetClusterName()
2662       elif field == "master_node":
2663         entry = self.cfg.GetMasterNode()
2664       elif field == "drain_flag":
2665         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2666       else:
2667         raise errors.ParameterError(field)
2668       values.append(entry)
2669     return values
2670
2671
2672 class LUActivateInstanceDisks(NoHooksLU):
2673   """Bring up an instance's disks.
2674
2675   """
2676   _OP_REQP = ["instance_name"]
2677   REQ_BGL = False
2678
2679   def ExpandNames(self):
2680     self._ExpandAndLockInstance()
2681     self.needed_locks[locking.LEVEL_NODE] = []
2682     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2683
2684   def DeclareLocks(self, level):
2685     if level == locking.LEVEL_NODE:
2686       self._LockInstancesNodes()
2687
2688   def CheckPrereq(self):
2689     """Check prerequisites.
2690
2691     This checks that the instance is in the cluster.
2692
2693     """
2694     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2695     assert self.instance is not None, \
2696       "Cannot retrieve locked instance %s" % self.op.instance_name
2697     _CheckNodeOnline(self, self.instance.primary_node)
2698     if not hasattr(self.op, "ignore_size"):
2699       self.op.ignore_size = False
2700
2701   def Exec(self, feedback_fn):
2702     """Activate the disks.
2703
2704     """
2705     disks_ok, disks_info = \
2706               _AssembleInstanceDisks(self, self.instance,
2707                                      ignore_size=self.op.ignore_size)
2708     if not disks_ok:
2709       raise errors.OpExecError("Cannot activate block devices")
2710
2711     return disks_info
2712
2713
2714 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
2715                            ignore_size=False):
2716   """Prepare the block devices for an instance.
2717
2718   This sets up the block devices on all nodes.
2719
2720   @type lu: L{LogicalUnit}
2721   @param lu: the logical unit on whose behalf we execute
2722   @type instance: L{objects.Instance}
2723   @param instance: the instance for whose disks we assemble
2724   @type ignore_secondaries: boolean
2725   @param ignore_secondaries: if true, errors on secondary nodes
2726       won't result in an error return from the function
2727   @type ignore_size: boolean
2728   @param ignore_size: if true, the current known size of the disk
2729       will not be used during the disk activation, useful for cases
2730       when the size is wrong
2731   @return: False if the operation failed, otherwise a list of
2732       (host, instance_visible_name, node_visible_name)
2733       with the mapping from node devices to instance devices
2734
2735   """
2736   device_info = []
2737   disks_ok = True
2738   iname = instance.name
2739   # With the two passes mechanism we try to reduce the window of
2740   # opportunity for the race condition of switching DRBD to primary
2741   # before handshaking occured, but we do not eliminate it
2742
2743   # The proper fix would be to wait (with some limits) until the
2744   # connection has been made and drbd transitions from WFConnection
2745   # into any other network-connected state (Connected, SyncTarget,
2746   # SyncSource, etc.)
2747
2748   # 1st pass, assemble on all nodes in secondary mode
2749   for inst_disk in instance.disks:
2750     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2751       if ignore_size:
2752         node_disk = node_disk.Copy()
2753         node_disk.UnsetSize()
2754       lu.cfg.SetDiskID(node_disk, node)
2755       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2756       msg = result.RemoteFailMsg()
2757       if msg:
2758         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2759                            " (is_primary=False, pass=1): %s",
2760                            inst_disk.iv_name, node, msg)
2761         if not ignore_secondaries:
2762           disks_ok = False
2763
2764   # FIXME: race condition on drbd migration to primary
2765
2766   # 2nd pass, do only the primary node
2767   for inst_disk in instance.disks:
2768     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2769       if node != instance.primary_node:
2770         continue
2771       if ignore_size:
2772         node_disk = node_disk.Copy()
2773         node_disk.UnsetSize()
2774       lu.cfg.SetDiskID(node_disk, node)
2775       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2776       msg = result.RemoteFailMsg()
2777       if msg:
2778         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2779                            " (is_primary=True, pass=2): %s",
2780                            inst_disk.iv_name, node, msg)
2781         disks_ok = False
2782     device_info.append((instance.primary_node, inst_disk.iv_name,
2783                         result.payload))
2784
2785   # leave the disks configured for the primary node
2786   # this is a workaround that would be fixed better by
2787   # improving the logical/physical id handling
2788   for disk in instance.disks:
2789     lu.cfg.SetDiskID(disk, instance.primary_node)
2790
2791   return disks_ok, device_info
2792
2793
2794 def _StartInstanceDisks(lu, instance, force):
2795   """Start the disks of an instance.
2796
2797   """
2798   disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2799                                            ignore_secondaries=force)
2800   if not disks_ok:
2801     _ShutdownInstanceDisks(lu, instance)
2802     if force is not None and not force:
2803       lu.proc.LogWarning("", hint="If the message above refers to a"
2804                          " secondary node,"
2805                          " you can retry the operation using '--force'.")
2806     raise errors.OpExecError("Disk consistency error")
2807
2808
2809 class LUDeactivateInstanceDisks(NoHooksLU):
2810   """Shutdown an instance's disks.
2811
2812   """
2813   _OP_REQP = ["instance_name"]
2814   REQ_BGL = False
2815
2816   def ExpandNames(self):
2817     self._ExpandAndLockInstance()
2818     self.needed_locks[locking.LEVEL_NODE] = []
2819     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2820
2821   def DeclareLocks(self, level):
2822     if level == locking.LEVEL_NODE:
2823       self._LockInstancesNodes()
2824
2825   def CheckPrereq(self):
2826     """Check prerequisites.
2827
2828     This checks that the instance is in the cluster.
2829
2830     """
2831     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2832     assert self.instance is not None, \
2833       "Cannot retrieve locked instance %s" % self.op.instance_name
2834
2835   def Exec(self, feedback_fn):
2836     """Deactivate the disks
2837
2838     """
2839     instance = self.instance
2840     _SafeShutdownInstanceDisks(self, instance)
2841
2842
2843 def _SafeShutdownInstanceDisks(lu, instance):
2844   """Shutdown block devices of an instance.
2845
2846   This function checks if an instance is running, before calling
2847   _ShutdownInstanceDisks.
2848
2849   """
2850   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2851                                       [instance.hypervisor])
2852   ins_l = ins_l[instance.primary_node]
2853   if ins_l.failed or not isinstance(ins_l.data, list):
2854     raise errors.OpExecError("Can't contact node '%s'" %
2855                              instance.primary_node)
2856
2857   if instance.name in ins_l.data:
2858     raise errors.OpExecError("Instance is running, can't shutdown"
2859                              " block devices.")
2860
2861   _ShutdownInstanceDisks(lu, instance)
2862
2863
2864 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2865   """Shutdown block devices of an instance.
2866
2867   This does the shutdown on all nodes of the instance.
2868
2869   If the ignore_primary is false, errors on the primary node are
2870   ignored.
2871
2872   """
2873   all_result = True
2874   for disk in instance.disks:
2875     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2876       lu.cfg.SetDiskID(top_disk, node)
2877       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2878       msg = result.RemoteFailMsg()
2879       if msg:
2880         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2881                       disk.iv_name, node, msg)
2882         if not ignore_primary or node != instance.primary_node:
2883           all_result = False
2884   return all_result
2885
2886
2887 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2888   """Checks if a node has enough free memory.
2889
2890   This function check if a given node has the needed amount of free
2891   memory. In case the node has less memory or we cannot get the
2892   information from the node, this function raise an OpPrereqError
2893   exception.
2894
2895   @type lu: C{LogicalUnit}
2896   @param lu: a logical unit from which we get configuration data
2897   @type node: C{str}
2898   @param node: the node to check
2899   @type reason: C{str}
2900   @param reason: string to use in the error message
2901   @type requested: C{int}
2902   @param requested: the amount of memory in MiB to check for
2903   @type hypervisor_name: C{str}
2904   @param hypervisor_name: the hypervisor to ask for memory stats
2905   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2906       we cannot check the node
2907
2908   """
2909   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2910   nodeinfo[node].Raise()
2911   free_mem = nodeinfo[node].data.get('memory_free')
2912   if not isinstance(free_mem, int):
2913     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2914                              " was '%s'" % (node, free_mem))
2915   if requested > free_mem:
2916     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2917                              " needed %s MiB, available %s MiB" %
2918                              (node, reason, requested, free_mem))
2919
2920
2921 class LUStartupInstance(LogicalUnit):
2922   """Starts an instance.
2923
2924   """
2925   HPATH = "instance-start"
2926   HTYPE = constants.HTYPE_INSTANCE
2927   _OP_REQP = ["instance_name", "force"]
2928   REQ_BGL = False
2929
2930   def ExpandNames(self):
2931     self._ExpandAndLockInstance()
2932
2933   def BuildHooksEnv(self):
2934     """Build hooks env.
2935
2936     This runs on master, primary and secondary nodes of the instance.
2937
2938     """
2939     env = {
2940       "FORCE": self.op.force,
2941       }
2942     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2943     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2944     return env, nl, nl
2945
2946   def CheckPrereq(self):
2947     """Check prerequisites.
2948
2949     This checks that the instance is in the cluster.
2950
2951     """
2952     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2953     assert self.instance is not None, \
2954       "Cannot retrieve locked instance %s" % self.op.instance_name
2955
2956     # extra beparams
2957     self.beparams = getattr(self.op, "beparams", {})
2958     if self.beparams:
2959       if not isinstance(self.beparams, dict):
2960         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2961                                    " dict" % (type(self.beparams), ))
2962       # fill the beparams dict
2963       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2964       self.op.beparams = self.beparams
2965
2966     # extra hvparams
2967     self.hvparams = getattr(self.op, "hvparams", {})
2968     if self.hvparams:
2969       if not isinstance(self.hvparams, dict):
2970         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2971                                    " dict" % (type(self.hvparams), ))
2972
2973       # check hypervisor parameter syntax (locally)
2974       cluster = self.cfg.GetClusterInfo()
2975       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2976       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
2977                                     instance.hvparams)
2978       filled_hvp.update(self.hvparams)
2979       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2980       hv_type.CheckParameterSyntax(filled_hvp)
2981       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2982       self.op.hvparams = self.hvparams
2983
2984     _CheckNodeOnline(self, instance.primary_node)
2985
2986     bep = self.cfg.GetClusterInfo().FillBE(instance)
2987     # check bridges existence
2988     _CheckInstanceBridgesExist(self, instance)
2989
2990     remote_info = self.rpc.call_instance_info(instance.primary_node,
2991                                               instance.name,
2992                                               instance.hypervisor)
2993     remote_info.Raise()
2994     if not remote_info.data:
2995       _CheckNodeFreeMemory(self, instance.primary_node,
2996                            "starting instance %s" % instance.name,
2997                            bep[constants.BE_MEMORY], instance.hypervisor)
2998
2999   def Exec(self, feedback_fn):
3000     """Start the instance.
3001
3002     """
3003     instance = self.instance
3004     force = self.op.force
3005
3006     self.cfg.MarkInstanceUp(instance.name)
3007
3008     node_current = instance.primary_node
3009
3010     _StartInstanceDisks(self, instance, force)
3011
3012     result = self.rpc.call_instance_start(node_current, instance,
3013                                           self.hvparams, self.beparams)
3014     msg = result.RemoteFailMsg()
3015     if msg:
3016       _ShutdownInstanceDisks(self, instance)
3017       raise errors.OpExecError("Could not start instance: %s" % msg)
3018
3019
3020 class LURebootInstance(LogicalUnit):
3021   """Reboot an instance.
3022
3023   """
3024   HPATH = "instance-reboot"
3025   HTYPE = constants.HTYPE_INSTANCE
3026   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3027   REQ_BGL = False
3028
3029   def ExpandNames(self):
3030     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3031                                    constants.INSTANCE_REBOOT_HARD,
3032                                    constants.INSTANCE_REBOOT_FULL]:
3033       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3034                                   (constants.INSTANCE_REBOOT_SOFT,
3035                                    constants.INSTANCE_REBOOT_HARD,
3036                                    constants.INSTANCE_REBOOT_FULL))
3037     self._ExpandAndLockInstance()
3038
3039   def BuildHooksEnv(self):
3040     """Build hooks env.
3041
3042     This runs on master, primary and secondary nodes of the instance.
3043
3044     """
3045     env = {
3046       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3047       "REBOOT_TYPE": self.op.reboot_type,
3048       }
3049     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3050     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3051     return env, nl, nl
3052
3053   def CheckPrereq(self):
3054     """Check prerequisites.
3055
3056     This checks that the instance is in the cluster.
3057
3058     """
3059     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3060     assert self.instance is not None, \
3061       "Cannot retrieve locked instance %s" % self.op.instance_name
3062
3063     _CheckNodeOnline(self, instance.primary_node)
3064
3065     # check bridges existence
3066     _CheckInstanceBridgesExist(self, instance)
3067
3068   def Exec(self, feedback_fn):
3069     """Reboot the instance.
3070
3071     """
3072     instance = self.instance
3073     ignore_secondaries = self.op.ignore_secondaries
3074     reboot_type = self.op.reboot_type
3075
3076     node_current = instance.primary_node
3077
3078     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3079                        constants.INSTANCE_REBOOT_HARD]:
3080       for disk in instance.disks:
3081         self.cfg.SetDiskID(disk, node_current)
3082       result = self.rpc.call_instance_reboot(node_current, instance,
3083                                              reboot_type)
3084       msg = result.RemoteFailMsg()
3085       if msg:
3086         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3087     else:
3088       result = self.rpc.call_instance_shutdown(node_current, instance)
3089       msg = result.RemoteFailMsg()
3090       if msg:
3091         raise errors.OpExecError("Could not shutdown instance for"
3092                                  " full reboot: %s" % msg)
3093       _ShutdownInstanceDisks(self, instance)
3094       _StartInstanceDisks(self, instance, ignore_secondaries)
3095       result = self.rpc.call_instance_start(node_current, instance, None, None)
3096       msg = result.RemoteFailMsg()
3097       if msg:
3098         _ShutdownInstanceDisks(self, instance)
3099         raise errors.OpExecError("Could not start instance for"
3100                                  " full reboot: %s" % msg)
3101
3102     self.cfg.MarkInstanceUp(instance.name)
3103
3104
3105 class LUShutdownInstance(LogicalUnit):
3106   """Shutdown an instance.
3107
3108   """
3109   HPATH = "instance-stop"
3110   HTYPE = constants.HTYPE_INSTANCE
3111   _OP_REQP = ["instance_name"]
3112   REQ_BGL = False
3113
3114   def ExpandNames(self):
3115     self._ExpandAndLockInstance()
3116
3117   def BuildHooksEnv(self):
3118     """Build hooks env.
3119
3120     This runs on master, primary and secondary nodes of the instance.
3121
3122     """
3123     env = _BuildInstanceHookEnvByObject(self, self.instance)
3124     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3125     return env, nl, nl
3126
3127   def CheckPrereq(self):
3128     """Check prerequisites.
3129
3130     This checks that the instance is in the cluster.
3131
3132     """
3133     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3134     assert self.instance is not None, \
3135       "Cannot retrieve locked instance %s" % self.op.instance_name
3136     _CheckNodeOnline(self, self.instance.primary_node)
3137
3138   def Exec(self, feedback_fn):
3139     """Shutdown the instance.
3140
3141     """
3142     instance = self.instance
3143     node_current = instance.primary_node
3144     self.cfg.MarkInstanceDown(instance.name)
3145     result = self.rpc.call_instance_shutdown(node_current, instance)
3146     msg = result.RemoteFailMsg()
3147     if msg:
3148       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3149
3150     _ShutdownInstanceDisks(self, instance)
3151
3152
3153 class LUReinstallInstance(LogicalUnit):
3154   """Reinstall an instance.
3155
3156   """
3157   HPATH = "instance-reinstall"
3158   HTYPE = constants.HTYPE_INSTANCE
3159   _OP_REQP = ["instance_name"]
3160   REQ_BGL = False
3161
3162   def ExpandNames(self):
3163     self._ExpandAndLockInstance()
3164
3165   def BuildHooksEnv(self):
3166     """Build hooks env.
3167
3168     This runs on master, primary and secondary nodes of the instance.
3169
3170     """
3171     env = _BuildInstanceHookEnvByObject(self, self.instance)
3172     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3173     return env, nl, nl
3174
3175   def CheckPrereq(self):
3176     """Check prerequisites.
3177
3178     This checks that the instance is in the cluster and is not running.
3179
3180     """
3181     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3182     assert instance is not None, \
3183       "Cannot retrieve locked instance %s" % self.op.instance_name
3184     _CheckNodeOnline(self, instance.primary_node)
3185
3186     if instance.disk_template == constants.DT_DISKLESS:
3187       raise errors.OpPrereqError("Instance '%s' has no disks" %
3188                                  self.op.instance_name)
3189     if instance.admin_up:
3190       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3191                                  self.op.instance_name)
3192     remote_info = self.rpc.call_instance_info(instance.primary_node,
3193                                               instance.name,
3194                                               instance.hypervisor)
3195     remote_info.Raise()
3196     if remote_info.data:
3197       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3198                                  (self.op.instance_name,
3199                                   instance.primary_node))
3200
3201     self.op.os_type = getattr(self.op, "os_type", None)
3202     if self.op.os_type is not None:
3203       # OS verification
3204       pnode = self.cfg.GetNodeInfo(
3205         self.cfg.ExpandNodeName(instance.primary_node))
3206       if pnode is None:
3207         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3208                                    self.op.pnode)
3209       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3210       result.Raise()
3211       if not isinstance(result.data, objects.OS):
3212         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3213                                    " primary node"  % self.op.os_type)
3214
3215     self.instance = instance
3216
3217   def Exec(self, feedback_fn):
3218     """Reinstall the instance.
3219
3220     """
3221     inst = self.instance
3222
3223     if self.op.os_type is not None:
3224       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3225       inst.os = self.op.os_type
3226       self.cfg.Update(inst)
3227
3228     _StartInstanceDisks(self, inst, None)
3229     try:
3230       feedback_fn("Running the instance OS create scripts...")
3231       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3232       msg = result.RemoteFailMsg()
3233       if msg:
3234         raise errors.OpExecError("Could not install OS for instance %s"
3235                                  " on node %s: %s" %
3236                                  (inst.name, inst.primary_node, msg))
3237     finally:
3238       _ShutdownInstanceDisks(self, inst)
3239
3240
3241 class LURenameInstance(LogicalUnit):
3242   """Rename an instance.
3243
3244   """
3245   HPATH = "instance-rename"
3246   HTYPE = constants.HTYPE_INSTANCE
3247   _OP_REQP = ["instance_name", "new_name"]
3248
3249   def BuildHooksEnv(self):
3250     """Build hooks env.
3251
3252     This runs on master, primary and secondary nodes of the instance.
3253
3254     """
3255     env = _BuildInstanceHookEnvByObject(self, self.instance)
3256     env["INSTANCE_NEW_NAME"] = self.op.new_name
3257     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3258     return env, nl, nl
3259
3260   def CheckPrereq(self):
3261     """Check prerequisites.
3262
3263     This checks that the instance is in the cluster and is not running.
3264
3265     """
3266     instance = self.cfg.GetInstanceInfo(
3267       self.cfg.ExpandInstanceName(self.op.instance_name))
3268     if instance is None:
3269       raise errors.OpPrereqError("Instance '%s' not known" %
3270                                  self.op.instance_name)
3271     _CheckNodeOnline(self, instance.primary_node)
3272
3273     if instance.admin_up:
3274       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3275                                  self.op.instance_name)
3276     remote_info = self.rpc.call_instance_info(instance.primary_node,
3277                                               instance.name,
3278                                               instance.hypervisor)
3279     remote_info.Raise()
3280     if remote_info.data:
3281       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3282                                  (self.op.instance_name,
3283                                   instance.primary_node))
3284     self.instance = instance
3285
3286     # new name verification
3287     name_info = utils.HostInfo(self.op.new_name)
3288
3289     self.op.new_name = new_name = name_info.name
3290     instance_list = self.cfg.GetInstanceList()
3291     if new_name in instance_list:
3292       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3293                                  new_name)
3294
3295     if not getattr(self.op, "ignore_ip", False):
3296       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3297         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3298                                    (name_info.ip, new_name))
3299
3300
3301   def Exec(self, feedback_fn):
3302     """Reinstall the instance.
3303
3304     """
3305     inst = self.instance
3306     old_name = inst.name
3307
3308     if inst.disk_template == constants.DT_FILE:
3309       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3310
3311     self.cfg.RenameInstance(inst.name, self.op.new_name)
3312     # Change the instance lock. This is definitely safe while we hold the BGL
3313     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3314     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3315
3316     # re-read the instance from the configuration after rename
3317     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3318
3319     if inst.disk_template == constants.DT_FILE:
3320       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3321       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3322                                                      old_file_storage_dir,
3323                                                      new_file_storage_dir)
3324       result.Raise()
3325       if not result.data:
3326         raise errors.OpExecError("Could not connect to node '%s' to rename"
3327                                  " directory '%s' to '%s' (but the instance"
3328                                  " has been renamed in Ganeti)" % (
3329                                  inst.primary_node, old_file_storage_dir,
3330                                  new_file_storage_dir))
3331
3332       if not result.data[0]:
3333         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3334                                  " (but the instance has been renamed in"
3335                                  " Ganeti)" % (old_file_storage_dir,
3336                                                new_file_storage_dir))
3337
3338     _StartInstanceDisks(self, inst, None)
3339     try:
3340       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3341                                                  old_name)
3342       msg = result.RemoteFailMsg()
3343       if msg:
3344         msg = ("Could not run OS rename script for instance %s on node %s"
3345                " (but the instance has been renamed in Ganeti): %s" %
3346                (inst.name, inst.primary_node, msg))
3347         self.proc.LogWarning(msg)
3348     finally:
3349       _ShutdownInstanceDisks(self, inst)
3350
3351
3352 class LURemoveInstance(LogicalUnit):
3353   """Remove an instance.
3354
3355   """
3356   HPATH = "instance-remove"
3357   HTYPE = constants.HTYPE_INSTANCE
3358   _OP_REQP = ["instance_name", "ignore_failures"]
3359   REQ_BGL = False
3360
3361   def ExpandNames(self):
3362     self._ExpandAndLockInstance()
3363     self.needed_locks[locking.LEVEL_NODE] = []
3364     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3365
3366   def DeclareLocks(self, level):
3367     if level == locking.LEVEL_NODE:
3368       self._LockInstancesNodes()
3369
3370   def BuildHooksEnv(self):
3371     """Build hooks env.
3372
3373     This runs on master, primary and secondary nodes of the instance.
3374
3375     """
3376     env = _BuildInstanceHookEnvByObject(self, self.instance)
3377     nl = [self.cfg.GetMasterNode()]
3378     return env, nl, nl
3379
3380   def CheckPrereq(self):
3381     """Check prerequisites.
3382
3383     This checks that the instance is in the cluster.
3384
3385     """
3386     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3387     assert self.instance is not None, \
3388       "Cannot retrieve locked instance %s" % self.op.instance_name
3389
3390   def Exec(self, feedback_fn):
3391     """Remove the instance.
3392
3393     """
3394     instance = self.instance
3395     logging.info("Shutting down instance %s on node %s",
3396                  instance.name, instance.primary_node)
3397
3398     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3399     msg = result.RemoteFailMsg()
3400     if msg:
3401       if self.op.ignore_failures:
3402         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3403       else:
3404         raise errors.OpExecError("Could not shutdown instance %s on"
3405                                  " node %s: %s" %
3406                                  (instance.name, instance.primary_node, msg))
3407
3408     logging.info("Removing block devices for instance %s", instance.name)
3409
3410     if not _RemoveDisks(self, instance):
3411       if self.op.ignore_failures:
3412         feedback_fn("Warning: can't remove instance's disks")
3413       else:
3414         raise errors.OpExecError("Can't remove instance's disks")
3415
3416     logging.info("Removing instance %s out of cluster config", instance.name)
3417
3418     self.cfg.RemoveInstance(instance.name)
3419     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3420
3421
3422 class LUQueryInstances(NoHooksLU):
3423   """Logical unit for querying instances.
3424
3425   """
3426   _OP_REQP = ["output_fields", "names", "use_locking"]
3427   REQ_BGL = False
3428   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3429                                     "admin_state",
3430                                     "disk_template", "ip", "mac", "bridge",
3431                                     "sda_size", "sdb_size", "vcpus", "tags",
3432                                     "network_port", "beparams",
3433                                     r"(disk)\.(size)/([0-9]+)",
3434                                     r"(disk)\.(sizes)", "disk_usage",
3435                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3436                                     r"(nic)\.(macs|ips|bridges)",
3437                                     r"(disk|nic)\.(count)",
3438                                     "serial_no", "hypervisor", "hvparams",] +
3439                                   ["hv/%s" % name
3440                                    for name in constants.HVS_PARAMETERS] +
3441                                   ["be/%s" % name
3442                                    for name in constants.BES_PARAMETERS])
3443   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3444
3445
3446   def ExpandNames(self):
3447     _CheckOutputFields(static=self._FIELDS_STATIC,
3448                        dynamic=self._FIELDS_DYNAMIC,
3449                        selected=self.op.output_fields)
3450
3451     self.needed_locks = {}
3452     self.share_locks[locking.LEVEL_INSTANCE] = 1
3453     self.share_locks[locking.LEVEL_NODE] = 1
3454
3455     if self.op.names:
3456       self.wanted = _GetWantedInstances(self, self.op.names)
3457     else:
3458       self.wanted = locking.ALL_SET
3459
3460     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3461     self.do_locking = self.do_node_query and self.op.use_locking
3462     if self.do_locking:
3463       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3464       self.needed_locks[locking.LEVEL_NODE] = []
3465       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3466
3467   def DeclareLocks(self, level):
3468     if level == locking.LEVEL_NODE and self.do_locking:
3469       self._LockInstancesNodes()
3470
3471   def CheckPrereq(self):
3472     """Check prerequisites.
3473
3474     """
3475     pass
3476
3477   def Exec(self, feedback_fn):
3478     """Computes the list of nodes and their attributes.
3479
3480     """
3481     all_info = self.cfg.GetAllInstancesInfo()
3482     if self.wanted == locking.ALL_SET:
3483       # caller didn't specify instance names, so ordering is not important
3484       if self.do_locking:
3485         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3486       else:
3487         instance_names = all_info.keys()
3488       instance_names = utils.NiceSort(instance_names)
3489     else:
3490       # caller did specify names, so we must keep the ordering
3491       if self.do_locking:
3492         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3493       else:
3494         tgt_set = all_info.keys()
3495       missing = set(self.wanted).difference(tgt_set)
3496       if missing:
3497         raise errors.OpExecError("Some instances were removed before"
3498                                  " retrieving their data: %s" % missing)
3499       instance_names = self.wanted
3500
3501     instance_list = [all_info[iname] for iname in instance_names]
3502
3503     # begin data gathering
3504
3505     nodes = frozenset([inst.primary_node for inst in instance_list])
3506     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3507
3508     bad_nodes = []
3509     off_nodes = []
3510     if self.do_node_query:
3511       live_data = {}
3512       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3513       for name in nodes:
3514         result = node_data[name]
3515         if result.offline:
3516           # offline nodes will be in both lists
3517           off_nodes.append(name)
3518         if result.failed:
3519           bad_nodes.append(name)
3520         else:
3521           if result.data:
3522             live_data.update(result.data)
3523             # else no instance is alive
3524     else:
3525       live_data = dict([(name, {}) for name in instance_names])
3526
3527     # end data gathering
3528
3529     HVPREFIX = "hv/"
3530     BEPREFIX = "be/"
3531     output = []
3532     for instance in instance_list:
3533       iout = []
3534       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3535       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3536       for field in self.op.output_fields:
3537         st_match = self._FIELDS_STATIC.Matches(field)
3538         if field == "name":
3539           val = instance.name
3540         elif field == "os":
3541           val = instance.os
3542         elif field == "pnode":
3543           val = instance.primary_node
3544         elif field == "snodes":
3545           val = list(instance.secondary_nodes)
3546         elif field == "admin_state":
3547           val = instance.admin_up
3548         elif field == "oper_state":
3549           if instance.primary_node in bad_nodes:
3550             val = None
3551           else:
3552             val = bool(live_data.get(instance.name))
3553         elif field == "status":
3554           if instance.primary_node in off_nodes:
3555             val = "ERROR_nodeoffline"
3556           elif instance.primary_node in bad_nodes:
3557             val = "ERROR_nodedown"
3558           else:
3559             running = bool(live_data.get(instance.name))
3560             if running:
3561               if instance.admin_up:
3562                 val = "running"
3563               else:
3564                 val = "ERROR_up"
3565             else:
3566               if instance.admin_up:
3567                 val = "ERROR_down"
3568               else:
3569                 val = "ADMIN_down"
3570         elif field == "oper_ram":
3571           if instance.primary_node in bad_nodes:
3572             val = None
3573           elif instance.name in live_data:
3574             val = live_data[instance.name].get("memory", "?")
3575           else:
3576             val = "-"
3577         elif field == "vcpus":
3578           val = i_be[constants.BE_VCPUS]
3579         elif field == "disk_template":
3580           val = instance.disk_template
3581         elif field == "ip":
3582           if instance.nics:
3583             val = instance.nics[0].ip
3584           else:
3585             val = None
3586         elif field == "bridge":
3587           if instance.nics:
3588             val = instance.nics[0].bridge
3589           else:
3590             val = None
3591         elif field == "mac":
3592           if instance.nics:
3593             val = instance.nics[0].mac
3594           else:
3595             val = None
3596         elif field == "sda_size" or field == "sdb_size":
3597           idx = ord(field[2]) - ord('a')
3598           try:
3599             val = instance.FindDisk(idx).size
3600           except errors.OpPrereqError:
3601             val = None
3602         elif field == "disk_usage": # total disk usage per node
3603           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3604           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3605         elif field == "tags":
3606           val = list(instance.GetTags())
3607         elif field == "serial_no":
3608           val = instance.serial_no
3609         elif field == "network_port":
3610           val = instance.network_port
3611         elif field == "hypervisor":
3612           val = instance.hypervisor
3613         elif field == "hvparams":
3614           val = i_hv
3615         elif (field.startswith(HVPREFIX) and
3616               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3617           val = i_hv.get(field[len(HVPREFIX):], None)
3618         elif field == "beparams":
3619           val = i_be
3620         elif (field.startswith(BEPREFIX) and
3621               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3622           val = i_be.get(field[len(BEPREFIX):], None)
3623         elif st_match and st_match.groups():
3624           # matches a variable list
3625           st_groups = st_match.groups()
3626           if st_groups and st_groups[0] == "disk":
3627             if st_groups[1] == "count":
3628               val = len(instance.disks)
3629             elif st_groups[1] == "sizes":
3630               val = [disk.size for disk in instance.disks]
3631             elif st_groups[1] == "size":
3632               try:
3633                 val = instance.FindDisk(st_groups[2]).size
3634               except errors.OpPrereqError:
3635                 val = None
3636             else:
3637               assert False, "Unhandled disk parameter"
3638           elif st_groups[0] == "nic":
3639             if st_groups[1] == "count":
3640               val = len(instance.nics)
3641             elif st_groups[1] == "macs":
3642               val = [nic.mac for nic in instance.nics]
3643             elif st_groups[1] == "ips":
3644               val = [nic.ip for nic in instance.nics]
3645             elif st_groups[1] == "bridges":
3646               val = [nic.bridge for nic in instance.nics]
3647             else:
3648               # index-based item
3649               nic_idx = int(st_groups[2])
3650               if nic_idx >= len(instance.nics):
3651                 val = None
3652               else:
3653                 if st_groups[1] == "mac":
3654                   val = instance.nics[nic_idx].mac
3655                 elif st_groups[1] == "ip":
3656                   val = instance.nics[nic_idx].ip
3657                 elif st_groups[1] == "bridge":
3658                   val = instance.nics[nic_idx].bridge
3659                 else:
3660                   assert False, "Unhandled NIC parameter"
3661           else:
3662             assert False, ("Declared but unhandled variable parameter '%s'" %
3663                            field)
3664         else:
3665           assert False, "Declared but unhandled parameter '%s'" % field
3666         iout.append(val)
3667       output.append(iout)
3668
3669     return output
3670
3671
3672 class LUFailoverInstance(LogicalUnit):
3673   """Failover an instance.
3674
3675   """
3676   HPATH = "instance-failover"
3677   HTYPE = constants.HTYPE_INSTANCE
3678   _OP_REQP = ["instance_name", "ignore_consistency"]
3679   REQ_BGL = False
3680
3681   def ExpandNames(self):
3682     self._ExpandAndLockInstance()
3683     self.needed_locks[locking.LEVEL_NODE] = []
3684     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3685
3686   def DeclareLocks(self, level):
3687     if level == locking.LEVEL_NODE:
3688       self._LockInstancesNodes()
3689
3690   def BuildHooksEnv(self):
3691     """Build hooks env.
3692
3693     This runs on master, primary and secondary nodes of the instance.
3694
3695     """
3696     env = {
3697       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3698       }
3699     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3700     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3701     return env, nl, nl
3702
3703   def CheckPrereq(self):
3704     """Check prerequisites.
3705
3706     This checks that the instance is in the cluster.
3707
3708     """
3709     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3710     assert self.instance is not None, \
3711       "Cannot retrieve locked instance %s" % self.op.instance_name
3712
3713     bep = self.cfg.GetClusterInfo().FillBE(instance)
3714     if instance.disk_template not in constants.DTS_NET_MIRROR:
3715       raise errors.OpPrereqError("Instance's disk layout is not"
3716                                  " network mirrored, cannot failover.")
3717
3718     secondary_nodes = instance.secondary_nodes
3719     if not secondary_nodes:
3720       raise errors.ProgrammerError("no secondary node but using "
3721                                    "a mirrored disk template")
3722
3723     target_node = secondary_nodes[0]
3724     _CheckNodeOnline(self, target_node)
3725     _CheckNodeNotDrained(self, target_node)
3726
3727     if instance.admin_up:
3728       # check memory requirements on the secondary node
3729       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3730                            instance.name, bep[constants.BE_MEMORY],
3731                            instance.hypervisor)
3732     else:
3733       self.LogInfo("Not checking memory on the secondary node as"
3734                    " instance will not be started")
3735
3736     # check bridge existence
3737     brlist = [nic.bridge for nic in instance.nics]
3738     result = self.rpc.call_bridges_exist(target_node, brlist)
3739     result.Raise()
3740     if not result.data:
3741       raise errors.OpPrereqError("One or more target bridges %s does not"
3742                                  " exist on destination node '%s'" %
3743                                  (brlist, target_node))
3744
3745   def Exec(self, feedback_fn):
3746     """Failover an instance.
3747
3748     The failover is done by shutting it down on its present node and
3749     starting it on the secondary.
3750
3751     """
3752     instance = self.instance
3753
3754     source_node = instance.primary_node
3755     target_node = instance.secondary_nodes[0]
3756
3757     feedback_fn("* checking disk consistency between source and target")
3758     for dev in instance.disks:
3759       # for drbd, these are drbd over lvm
3760       if not _CheckDiskConsistency(self, dev, target_node, False):
3761         if instance.admin_up and not self.op.ignore_consistency:
3762           raise errors.OpExecError("Disk %s is degraded on target node,"
3763                                    " aborting failover." % dev.iv_name)
3764
3765     feedback_fn("* shutting down instance on source node")
3766     logging.info("Shutting down instance %s on node %s",
3767                  instance.name, source_node)
3768
3769     result = self.rpc.call_instance_shutdown(source_node, instance)
3770     msg = result.RemoteFailMsg()
3771     if msg:
3772       if self.op.ignore_consistency:
3773         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3774                              " Proceeding anyway. Please make sure node"
3775                              " %s is down. Error details: %s",
3776                              instance.name, source_node, source_node, msg)
3777       else:
3778         raise errors.OpExecError("Could not shutdown instance %s on"
3779                                  " node %s: %s" %
3780                                  (instance.name, source_node, msg))
3781
3782     feedback_fn("* deactivating the instance's disks on source node")
3783     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3784       raise errors.OpExecError("Can't shut down the instance's disks.")
3785
3786     instance.primary_node = target_node
3787     # distribute new instance config to the other nodes
3788     self.cfg.Update(instance)
3789
3790     # Only start the instance if it's marked as up
3791     if instance.admin_up:
3792       feedback_fn("* activating the instance's disks on target node")
3793       logging.info("Starting instance %s on node %s",
3794                    instance.name, target_node)
3795
3796       disks_ok, _ = _AssembleInstanceDisks(self, instance,
3797                                                ignore_secondaries=True)
3798       if not disks_ok:
3799         _ShutdownInstanceDisks(self, instance)
3800         raise errors.OpExecError("Can't activate the instance's disks")
3801
3802       feedback_fn("* starting the instance on the target node")
3803       result = self.rpc.call_instance_start(target_node, instance, None, None)
3804       msg = result.RemoteFailMsg()
3805       if msg:
3806         _ShutdownInstanceDisks(self, instance)
3807         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3808                                  (instance.name, target_node, msg))
3809
3810
3811 class LUMigrateInstance(LogicalUnit):
3812   """Migrate an instance.
3813
3814   This is migration without shutting down, compared to the failover,
3815   which is done with shutdown.
3816
3817   """
3818   HPATH = "instance-migrate"
3819   HTYPE = constants.HTYPE_INSTANCE
3820   _OP_REQP = ["instance_name", "live", "cleanup"]
3821
3822   REQ_BGL = False
3823
3824   def ExpandNames(self):
3825     self._ExpandAndLockInstance()
3826     self.needed_locks[locking.LEVEL_NODE] = []
3827     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3828
3829   def DeclareLocks(self, level):
3830     if level == locking.LEVEL_NODE:
3831       self._LockInstancesNodes()
3832
3833   def BuildHooksEnv(self):
3834     """Build hooks env.
3835
3836     This runs on master, primary and secondary nodes of the instance.
3837
3838     """
3839     env = _BuildInstanceHookEnvByObject(self, self.instance)
3840     env["MIGRATE_LIVE"] = self.op.live
3841     env["MIGRATE_CLEANUP"] = self.op.cleanup
3842     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3843     return env, nl, nl
3844
3845   def CheckPrereq(self):
3846     """Check prerequisites.
3847
3848     This checks that the instance is in the cluster.
3849
3850     """
3851     instance = self.cfg.GetInstanceInfo(
3852       self.cfg.ExpandInstanceName(self.op.instance_name))
3853     if instance is None:
3854       raise errors.OpPrereqError("Instance '%s' not known" %
3855                                  self.op.instance_name)
3856
3857     if instance.disk_template != constants.DT_DRBD8:
3858       raise errors.OpPrereqError("Instance's disk layout is not"
3859                                  " drbd8, cannot migrate.")
3860
3861     secondary_nodes = instance.secondary_nodes
3862     if not secondary_nodes:
3863       raise errors.ConfigurationError("No secondary node but using"
3864                                       " drbd8 disk template")
3865
3866     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3867
3868     target_node = secondary_nodes[0]
3869     # check memory requirements on the secondary node
3870     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3871                          instance.name, i_be[constants.BE_MEMORY],
3872                          instance.hypervisor)
3873
3874     # check bridge existence
3875     brlist = [nic.bridge for nic in instance.nics]
3876     result = self.rpc.call_bridges_exist(target_node, brlist)
3877     if result.failed or not result.data:
3878       raise errors.OpPrereqError("One or more target bridges %s does not"
3879                                  " exist on destination node '%s'" %
3880                                  (brlist, target_node))
3881
3882     if not self.op.cleanup:
3883       _CheckNodeNotDrained(self, target_node)
3884       result = self.rpc.call_instance_migratable(instance.primary_node,
3885                                                  instance)
3886       msg = result.RemoteFailMsg()
3887       if msg:
3888         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3889                                    msg)
3890
3891     self.instance = instance
3892
3893   def _WaitUntilSync(self):
3894     """Poll with custom rpc for disk sync.
3895
3896     This uses our own step-based rpc call.
3897
3898     """
3899     self.feedback_fn("* wait until resync is done")
3900     all_done = False
3901     while not all_done:
3902       all_done = True
3903       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3904                                             self.nodes_ip,
3905                                             self.instance.disks)
3906       min_percent = 100
3907       for node, nres in result.items():
3908         msg = nres.RemoteFailMsg()
3909         if msg:
3910           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3911                                    (node, msg))
3912         node_done, node_percent = nres.payload
3913         all_done = all_done and node_done
3914         if node_percent is not None:
3915           min_percent = min(min_percent, node_percent)
3916       if not all_done:
3917         if min_percent < 100:
3918           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3919         time.sleep(2)
3920
3921   def _EnsureSecondary(self, node):
3922     """Demote a node to secondary.
3923
3924     """
3925     self.feedback_fn("* switching node %s to secondary mode" % node)
3926
3927     for dev in self.instance.disks:
3928       self.cfg.SetDiskID(dev, node)
3929
3930     result = self.rpc.call_blockdev_close(node, self.instance.name,
3931                                           self.instance.disks)
3932     msg = result.RemoteFailMsg()
3933     if msg:
3934       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3935                                " error %s" % (node, msg))
3936
3937   def _GoStandalone(self):
3938     """Disconnect from the network.
3939
3940     """
3941     self.feedback_fn("* changing into standalone mode")
3942     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3943                                                self.instance.disks)
3944     for node, nres in result.items():
3945       msg = nres.RemoteFailMsg()
3946       if msg:
3947         raise errors.OpExecError("Cannot disconnect disks node %s,"
3948                                  " error %s" % (node, msg))
3949
3950   def _GoReconnect(self, multimaster):
3951     """Reconnect to the network.
3952
3953     """
3954     if multimaster:
3955       msg = "dual-master"
3956     else:
3957       msg = "single-master"
3958     self.feedback_fn("* changing disks into %s mode" % msg)
3959     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3960                                            self.instance.disks,
3961                                            self.instance.name, multimaster)
3962     for node, nres in result.items():
3963       msg = nres.RemoteFailMsg()
3964       if msg:
3965         raise errors.OpExecError("Cannot change disks config on node %s,"
3966                                  " error: %s" % (node, msg))
3967
3968   def _ExecCleanup(self):
3969     """Try to cleanup after a failed migration.
3970
3971     The cleanup is done by:
3972       - check that the instance is running only on one node
3973         (and update the config if needed)
3974       - change disks on its secondary node to secondary
3975       - wait until disks are fully synchronized
3976       - disconnect from the network
3977       - change disks into single-master mode
3978       - wait again until disks are fully synchronized
3979
3980     """
3981     instance = self.instance
3982     target_node = self.target_node
3983     source_node = self.source_node
3984
3985     # check running on only one node
3986     self.feedback_fn("* checking where the instance actually runs"
3987                      " (if this hangs, the hypervisor might be in"
3988                      " a bad state)")
3989     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3990     for node, result in ins_l.items():
3991       result.Raise()
3992       if not isinstance(result.data, list):
3993         raise errors.OpExecError("Can't contact node '%s'" % node)
3994
3995     runningon_source = instance.name in ins_l[source_node].data
3996     runningon_target = instance.name in ins_l[target_node].data
3997
3998     if runningon_source and runningon_target:
3999       raise errors.OpExecError("Instance seems to be running on two nodes,"
4000                                " or the hypervisor is confused. You will have"
4001                                " to ensure manually that it runs only on one"
4002                                " and restart this operation.")
4003
4004     if not (runningon_source or runningon_target):
4005       raise errors.OpExecError("Instance does not seem to be running at all."
4006                                " In this case, it's safer to repair by"
4007                                " running 'gnt-instance stop' to ensure disk"
4008                                " shutdown, and then restarting it.")
4009
4010     if runningon_target:
4011       # the migration has actually succeeded, we need to update the config
4012       self.feedback_fn("* instance running on secondary node (%s),"
4013                        " updating config" % target_node)
4014       instance.primary_node = target_node
4015       self.cfg.Update(instance)
4016       demoted_node = source_node
4017     else:
4018       self.feedback_fn("* instance confirmed to be running on its"
4019                        " primary node (%s)" % source_node)
4020       demoted_node = target_node
4021
4022     self._EnsureSecondary(demoted_node)
4023     try:
4024       self._WaitUntilSync()
4025     except errors.OpExecError:
4026       # we ignore here errors, since if the device is standalone, it
4027       # won't be able to sync
4028       pass
4029     self._GoStandalone()
4030     self._GoReconnect(False)
4031     self._WaitUntilSync()
4032
4033     self.feedback_fn("* done")
4034
4035   def _RevertDiskStatus(self):
4036     """Try to revert the disk status after a failed migration.
4037
4038     """
4039     target_node = self.target_node
4040     try:
4041       self._EnsureSecondary(target_node)
4042       self._GoStandalone()
4043       self._GoReconnect(False)
4044       self._WaitUntilSync()
4045     except errors.OpExecError, err:
4046       self.LogWarning("Migration failed and I can't reconnect the"
4047                       " drives: error '%s'\n"
4048                       "Please look and recover the instance status" %
4049                       str(err))
4050
4051   def _AbortMigration(self):
4052     """Call the hypervisor code to abort a started migration.
4053
4054     """
4055     instance = self.instance
4056     target_node = self.target_node
4057     migration_info = self.migration_info
4058
4059     abort_result = self.rpc.call_finalize_migration(target_node,
4060                                                     instance,
4061                                                     migration_info,
4062                                                     False)
4063     abort_msg = abort_result.RemoteFailMsg()
4064     if abort_msg:
4065       logging.error("Aborting migration failed on target node %s: %s" %
4066                     (target_node, abort_msg))
4067       # Don't raise an exception here, as we stil have to try to revert the
4068       # disk status, even if this step failed.
4069
4070   def _ExecMigration(self):
4071     """Migrate an instance.
4072
4073     The migrate is done by:
4074       - change the disks into dual-master mode
4075       - wait until disks are fully synchronized again
4076       - migrate the instance
4077       - change disks on the new secondary node (the old primary) to secondary
4078       - wait until disks are fully synchronized
4079       - change disks into single-master mode
4080
4081     """
4082     instance = self.instance
4083     target_node = self.target_node
4084     source_node = self.source_node
4085
4086     self.feedback_fn("* checking disk consistency between source and target")
4087     for dev in instance.disks:
4088       if not _CheckDiskConsistency(self, dev, target_node, False):
4089         raise errors.OpExecError("Disk %s is degraded or not fully"
4090                                  " synchronized on target node,"
4091                                  " aborting migrate." % dev.iv_name)
4092
4093     # First get the migration information from the remote node
4094     result = self.rpc.call_migration_info(source_node, instance)
4095     msg = result.RemoteFailMsg()
4096     if msg:
4097       log_err = ("Failed fetching source migration information from %s: %s" %
4098                  (source_node, msg))
4099       logging.error(log_err)
4100       raise errors.OpExecError(log_err)
4101
4102     self.migration_info = migration_info = result.payload
4103
4104     # Then switch the disks to master/master mode
4105     self._EnsureSecondary(target_node)
4106     self._GoStandalone()
4107     self._GoReconnect(True)
4108     self._WaitUntilSync()
4109
4110     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4111     result = self.rpc.call_accept_instance(target_node,
4112                                            instance,
4113                                            migration_info,
4114                                            self.nodes_ip[target_node])
4115
4116     msg = result.RemoteFailMsg()
4117     if msg:
4118       logging.error("Instance pre-migration failed, trying to revert"
4119                     " disk status: %s", msg)
4120       self._AbortMigration()
4121       self._RevertDiskStatus()
4122       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4123                                (instance.name, msg))
4124
4125     self.feedback_fn("* migrating instance to %s" % target_node)
4126     time.sleep(10)
4127     result = self.rpc.call_instance_migrate(source_node, instance,
4128                                             self.nodes_ip[target_node],
4129                                             self.op.live)
4130     msg = result.RemoteFailMsg()
4131     if msg:
4132       logging.error("Instance migration failed, trying to revert"
4133                     " disk status: %s", msg)
4134       self._AbortMigration()
4135       self._RevertDiskStatus()
4136       raise errors.OpExecError("Could not migrate instance %s: %s" %
4137                                (instance.name, msg))
4138     time.sleep(10)
4139
4140     instance.primary_node = target_node
4141     # distribute new instance config to the other nodes
4142     self.cfg.Update(instance)
4143
4144     result = self.rpc.call_finalize_migration(target_node,
4145                                               instance,
4146                                               migration_info,
4147                                               True)
4148     msg = result.RemoteFailMsg()
4149     if msg:
4150       logging.error("Instance migration succeeded, but finalization failed:"
4151                     " %s" % msg)
4152       raise errors.OpExecError("Could not finalize instance migration: %s" %
4153                                msg)
4154
4155     self._EnsureSecondary(source_node)
4156     self._WaitUntilSync()
4157     self._GoStandalone()
4158     self._GoReconnect(False)
4159     self._WaitUntilSync()
4160
4161     self.feedback_fn("* done")
4162
4163   def Exec(self, feedback_fn):
4164     """Perform the migration.
4165
4166     """
4167     self.feedback_fn = feedback_fn
4168
4169     self.source_node = self.instance.primary_node
4170     self.target_node = self.instance.secondary_nodes[0]
4171     self.all_nodes = [self.source_node, self.target_node]
4172     self.nodes_ip = {
4173       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4174       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4175       }
4176     if self.op.cleanup:
4177       return self._ExecCleanup()
4178     else:
4179       return self._ExecMigration()
4180
4181
4182 def _CreateBlockDev(lu, node, instance, device, force_create,
4183                     info, force_open):
4184   """Create a tree of block devices on a given node.
4185
4186   If this device type has to be created on secondaries, create it and
4187   all its children.
4188
4189   If not, just recurse to children keeping the same 'force' value.
4190
4191   @param lu: the lu on whose behalf we execute
4192   @param node: the node on which to create the device
4193   @type instance: L{objects.Instance}
4194   @param instance: the instance which owns the device
4195   @type device: L{objects.Disk}
4196   @param device: the device to create
4197   @type force_create: boolean
4198   @param force_create: whether to force creation of this device; this
4199       will be change to True whenever we find a device which has
4200       CreateOnSecondary() attribute
4201   @param info: the extra 'metadata' we should attach to the device
4202       (this will be represented as a LVM tag)
4203   @type force_open: boolean
4204   @param force_open: this parameter will be passes to the
4205       L{backend.BlockdevCreate} function where it specifies
4206       whether we run on primary or not, and it affects both
4207       the child assembly and the device own Open() execution
4208
4209   """
4210   if device.CreateOnSecondary():
4211     force_create = True
4212
4213   if device.children:
4214     for child in device.children:
4215       _CreateBlockDev(lu, node, instance, child, force_create,
4216                       info, force_open)
4217
4218   if not force_create:
4219     return
4220
4221   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4222
4223
4224 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4225   """Create a single block device on a given node.
4226
4227   This will not recurse over children of the device, so they must be
4228   created in advance.
4229
4230   @param lu: the lu on whose behalf we execute
4231   @param node: the node on which to create the device
4232   @type instance: L{objects.Instance}
4233   @param instance: the instance which owns the device
4234   @type device: L{objects.Disk}
4235   @param device: the device to create
4236   @param info: the extra 'metadata' we should attach to the device
4237       (this will be represented as a LVM tag)
4238   @type force_open: boolean
4239   @param force_open: this parameter will be passes to the
4240       L{backend.BlockdevCreate} function where it specifies
4241       whether we run on primary or not, and it affects both
4242       the child assembly and the device own Open() execution
4243
4244   """
4245   lu.cfg.SetDiskID(device, node)
4246   result = lu.rpc.call_blockdev_create(node, device, device.size,
4247                                        instance.name, force_open, info)
4248   msg = result.RemoteFailMsg()
4249   if msg:
4250     raise errors.OpExecError("Can't create block device %s on"
4251                              " node %s for instance %s: %s" %
4252                              (device, node, instance.name, msg))
4253   if device.physical_id is None:
4254     device.physical_id = result.payload
4255
4256
4257 def _GenerateUniqueNames(lu, exts):
4258   """Generate a suitable LV name.
4259
4260   This will generate a logical volume name for the given instance.
4261
4262   """
4263   results = []
4264   for val in exts:
4265     new_id = lu.cfg.GenerateUniqueID()
4266     results.append("%s%s" % (new_id, val))
4267   return results
4268
4269
4270 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4271                          p_minor, s_minor):
4272   """Generate a drbd8 device complete with its children.
4273
4274   """
4275   port = lu.cfg.AllocatePort()
4276   vgname = lu.cfg.GetVGName()
4277   shared_secret = lu.cfg.GenerateDRBDSecret()
4278   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4279                           logical_id=(vgname, names[0]))
4280   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4281                           logical_id=(vgname, names[1]))
4282   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4283                           logical_id=(primary, secondary, port,
4284                                       p_minor, s_minor,
4285                                       shared_secret),
4286                           children=[dev_data, dev_meta],
4287                           iv_name=iv_name)
4288   return drbd_dev
4289
4290
4291 def _GenerateDiskTemplate(lu, template_name,
4292                           instance_name, primary_node,
4293                           secondary_nodes, disk_info,
4294                           file_storage_dir, file_driver,
4295                           base_index):
4296   """Generate the entire disk layout for a given template type.
4297
4298   """
4299   #TODO: compute space requirements
4300
4301   vgname = lu.cfg.GetVGName()
4302   disk_count = len(disk_info)
4303   disks = []
4304   if template_name == constants.DT_DISKLESS:
4305     pass
4306   elif template_name == constants.DT_PLAIN:
4307     if len(secondary_nodes) != 0:
4308       raise errors.ProgrammerError("Wrong template configuration")
4309
4310     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4311                                       for i in range(disk_count)])
4312     for idx, disk in enumerate(disk_info):
4313       disk_index = idx + base_index
4314       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4315                               logical_id=(vgname, names[idx]),
4316                               iv_name="disk/%d" % disk_index,
4317                               mode=disk["mode"])
4318       disks.append(disk_dev)
4319   elif template_name == constants.DT_DRBD8:
4320     if len(secondary_nodes) != 1:
4321       raise errors.ProgrammerError("Wrong template configuration")
4322     remote_node = secondary_nodes[0]
4323     minors = lu.cfg.AllocateDRBDMinor(
4324       [primary_node, remote_node] * len(disk_info), instance_name)
4325
4326     names = []
4327     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4328                                                for i in range(disk_count)]):
4329       names.append(lv_prefix + "_data")
4330       names.append(lv_prefix + "_meta")
4331     for idx, disk in enumerate(disk_info):
4332       disk_index = idx + base_index
4333       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4334                                       disk["size"], names[idx*2:idx*2+2],
4335                                       "disk/%d" % disk_index,
4336                                       minors[idx*2], minors[idx*2+1])
4337       disk_dev.mode = disk["mode"]
4338       disks.append(disk_dev)
4339   elif template_name == constants.DT_FILE:
4340     if len(secondary_nodes) != 0:
4341       raise errors.ProgrammerError("Wrong template configuration")
4342
4343     for idx, disk in enumerate(disk_info):
4344       disk_index = idx + base_index
4345       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4346                               iv_name="disk/%d" % disk_index,
4347                               logical_id=(file_driver,
4348                                           "%s/disk%d" % (file_storage_dir,
4349                                                          disk_index)),
4350                               mode=disk["mode"])
4351       disks.append(disk_dev)
4352   else:
4353     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4354   return disks
4355
4356
4357 def _GetInstanceInfoText(instance):
4358   """Compute that text that should be added to the disk's metadata.
4359
4360   """
4361   return "originstname+%s" % instance.name
4362
4363
4364 def _CreateDisks(lu, instance):
4365   """Create all disks for an instance.
4366
4367   This abstracts away some work from AddInstance.
4368
4369   @type lu: L{LogicalUnit}
4370   @param lu: the logical unit on whose behalf we execute
4371   @type instance: L{objects.Instance}
4372   @param instance: the instance whose disks we should create
4373   @rtype: boolean
4374   @return: the success of the creation
4375
4376   """
4377   info = _GetInstanceInfoText(instance)
4378   pnode = instance.primary_node
4379
4380   if instance.disk_template == constants.DT_FILE:
4381     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4382     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4383
4384     if result.failed or not result.data:
4385       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4386
4387     if not result.data[0]:
4388       raise errors.OpExecError("Failed to create directory '%s'" %
4389                                file_storage_dir)
4390
4391   # Note: this needs to be kept in sync with adding of disks in
4392   # LUSetInstanceParams
4393   for device in instance.disks:
4394     logging.info("Creating volume %s for instance %s",
4395                  device.iv_name, instance.name)
4396     #HARDCODE
4397     for node in instance.all_nodes:
4398       f_create = node == pnode
4399       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4400
4401
4402 def _RemoveDisks(lu, instance):
4403   """Remove all disks for an instance.
4404
4405   This abstracts away some work from `AddInstance()` and
4406   `RemoveInstance()`. Note that in case some of the devices couldn't
4407   be removed, the removal will continue with the other ones (compare
4408   with `_CreateDisks()`).
4409
4410   @type lu: L{LogicalUnit}
4411   @param lu: the logical unit on whose behalf we execute
4412   @type instance: L{objects.Instance}
4413   @param instance: the instance whose disks we should remove
4414   @rtype: boolean
4415   @return: the success of the removal
4416
4417   """
4418   logging.info("Removing block devices for instance %s", instance.name)
4419
4420   all_result = True
4421   for device in instance.disks:
4422     for node, disk in device.ComputeNodeTree(instance.primary_node):
4423       lu.cfg.SetDiskID(disk, node)
4424       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4425       if msg:
4426         lu.LogWarning("Could not remove block device %s on node %s,"
4427                       " continuing anyway: %s", device.iv_name, node, msg)
4428         all_result = False
4429
4430   if instance.disk_template == constants.DT_FILE:
4431     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4432     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4433                                                  file_storage_dir)
4434     if result.failed or not result.data:
4435       logging.error("Could not remove directory '%s'", file_storage_dir)
4436       all_result = False
4437
4438   return all_result
4439
4440
4441 def _ComputeDiskSize(disk_template, disks):
4442   """Compute disk size requirements in the volume group
4443
4444   """
4445   # Required free disk space as a function of disk and swap space
4446   req_size_dict = {
4447     constants.DT_DISKLESS: None,
4448     constants.DT_PLAIN: sum(d["size"] for d in disks),
4449     # 128 MB are added for drbd metadata for each disk
4450     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4451     constants.DT_FILE: None,
4452   }
4453
4454   if disk_template not in req_size_dict:
4455     raise errors.ProgrammerError("Disk template '%s' size requirement"
4456                                  " is unknown" %  disk_template)
4457
4458   return req_size_dict[disk_template]
4459
4460
4461 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4462   """Hypervisor parameter validation.
4463
4464   This function abstract the hypervisor parameter validation to be
4465   used in both instance create and instance modify.
4466
4467   @type lu: L{LogicalUnit}
4468   @param lu: the logical unit for which we check
4469   @type nodenames: list
4470   @param nodenames: the list of nodes on which we should check
4471   @type hvname: string
4472   @param hvname: the name of the hypervisor we should use
4473   @type hvparams: dict
4474   @param hvparams: the parameters which we need to check
4475   @raise errors.OpPrereqError: if the parameters are not valid
4476
4477   """
4478   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4479                                                   hvname,
4480                                                   hvparams)
4481   for node in nodenames:
4482     info = hvinfo[node]
4483     if info.offline:
4484       continue
4485     msg = info.RemoteFailMsg()
4486     if msg:
4487       raise errors.OpPrereqError("Hypervisor parameter validation"
4488                                  " failed on node %s: %s" % (node, msg))
4489
4490
4491 class LUCreateInstance(LogicalUnit):
4492   """Create an instance.
4493
4494   """
4495   HPATH = "instance-add"
4496   HTYPE = constants.HTYPE_INSTANCE
4497   _OP_REQP = ["instance_name", "disks", "disk_template",
4498               "mode", "start",
4499               "wait_for_sync", "ip_check", "nics",
4500               "hvparams", "beparams"]
4501   REQ_BGL = False
4502
4503   def _ExpandNode(self, node):
4504     """Expands and checks one node name.
4505
4506     """
4507     node_full = self.cfg.ExpandNodeName(node)
4508     if node_full is None:
4509       raise errors.OpPrereqError("Unknown node %s" % node)
4510     return node_full
4511
4512   def ExpandNames(self):
4513     """ExpandNames for CreateInstance.
4514
4515     Figure out the right locks for instance creation.
4516
4517     """
4518     self.needed_locks = {}
4519
4520     # set optional parameters to none if they don't exist
4521     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4522       if not hasattr(self.op, attr):
4523         setattr(self.op, attr, None)
4524
4525     # cheap checks, mostly valid constants given
4526
4527     # verify creation mode
4528     if self.op.mode not in (constants.INSTANCE_CREATE,
4529                             constants.INSTANCE_IMPORT):
4530       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4531                                  self.op.mode)
4532
4533     # disk template and mirror node verification
4534     if self.op.disk_template not in constants.DISK_TEMPLATES:
4535       raise errors.OpPrereqError("Invalid disk template name")
4536
4537     if self.op.hypervisor is None:
4538       self.op.hypervisor = self.cfg.GetHypervisorType()
4539
4540     cluster = self.cfg.GetClusterInfo()
4541     enabled_hvs = cluster.enabled_hypervisors
4542     if self.op.hypervisor not in enabled_hvs:
4543       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4544                                  " cluster (%s)" % (self.op.hypervisor,
4545                                   ",".join(enabled_hvs)))
4546
4547     # check hypervisor parameter syntax (locally)
4548     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4549     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4550                                   self.op.hvparams)
4551     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4552     hv_type.CheckParameterSyntax(filled_hvp)
4553     self.hv_full = filled_hvp
4554
4555     # fill and remember the beparams dict
4556     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4557     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4558                                     self.op.beparams)
4559
4560     #### instance parameters check
4561
4562     # instance name verification
4563     hostname1 = utils.HostInfo(self.op.instance_name)
4564     self.op.instance_name = instance_name = hostname1.name
4565
4566     # this is just a preventive check, but someone might still add this
4567     # instance in the meantime, and creation will fail at lock-add time
4568     if instance_name in self.cfg.GetInstanceList():
4569       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4570                                  instance_name)
4571
4572     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4573
4574     # NIC buildup
4575     self.nics = []
4576     for nic in self.op.nics:
4577       # ip validity checks
4578       ip = nic.get("ip", None)
4579       if ip is None or ip.lower() == "none":
4580         nic_ip = None
4581       elif ip.lower() == constants.VALUE_AUTO:
4582         nic_ip = hostname1.ip
4583       else:
4584         if not utils.IsValidIP(ip):
4585           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4586                                      " like a valid IP" % ip)
4587         nic_ip = ip
4588
4589       # MAC address verification
4590       mac = nic.get("mac", constants.VALUE_AUTO)
4591       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4592         if not utils.IsValidMac(mac.lower()):
4593           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4594                                      mac)
4595       # bridge verification
4596       bridge = nic.get("bridge", None)
4597       if bridge is None:
4598         bridge = self.cfg.GetDefBridge()
4599       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4600
4601     # disk checks/pre-build
4602     self.disks = []
4603     for disk in self.op.disks:
4604       mode = disk.get("mode", constants.DISK_RDWR)
4605       if mode not in constants.DISK_ACCESS_SET:
4606         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4607                                    mode)
4608       size = disk.get("size", None)
4609       if size is None:
4610         raise errors.OpPrereqError("Missing disk size")
4611       try:
4612         size = int(size)
4613       except ValueError:
4614         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4615       self.disks.append({"size": size, "mode": mode})
4616
4617     # used in CheckPrereq for ip ping check
4618     self.check_ip = hostname1.ip
4619
4620     # file storage checks
4621     if (self.op.file_driver and
4622         not self.op.file_driver in constants.FILE_DRIVER):
4623       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4624                                  self.op.file_driver)
4625
4626     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4627       raise errors.OpPrereqError("File storage directory path not absolute")
4628
4629     ### Node/iallocator related checks
4630     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4631       raise errors.OpPrereqError("One and only one of iallocator and primary"
4632                                  " node must be given")
4633
4634     if self.op.iallocator:
4635       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4636     else:
4637       self.op.pnode = self._ExpandNode(self.op.pnode)
4638       nodelist = [self.op.pnode]
4639       if self.op.snode is not None:
4640         self.op.snode = self._ExpandNode(self.op.snode)
4641         nodelist.append(self.op.snode)
4642       self.needed_locks[locking.LEVEL_NODE] = nodelist
4643
4644     # in case of import lock the source node too
4645     if self.op.mode == constants.INSTANCE_IMPORT:
4646       src_node = getattr(self.op, "src_node", None)
4647       src_path = getattr(self.op, "src_path", None)
4648
4649       if src_path is None:
4650         self.op.src_path = src_path = self.op.instance_name
4651
4652       if src_node is None:
4653         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4654         self.op.src_node = None
4655         if os.path.isabs(src_path):
4656           raise errors.OpPrereqError("Importing an instance from an absolute"
4657                                      " path requires a source node option.")
4658       else:
4659         self.op.src_node = src_node = self._ExpandNode(src_node)
4660         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4661           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4662         if not os.path.isabs(src_path):
4663           self.op.src_path = src_path = \
4664             os.path.join(constants.EXPORT_DIR, src_path)
4665
4666     else: # INSTANCE_CREATE
4667       if getattr(self.op, "os_type", None) is None:
4668         raise errors.OpPrereqError("No guest OS specified")
4669
4670   def _RunAllocator(self):
4671     """Run the allocator based on input opcode.
4672
4673     """
4674     nics = [n.ToDict() for n in self.nics]
4675     ial = IAllocator(self,
4676                      mode=constants.IALLOCATOR_MODE_ALLOC,
4677                      name=self.op.instance_name,
4678                      disk_template=self.op.disk_template,
4679                      tags=[],
4680                      os=self.op.os_type,
4681                      vcpus=self.be_full[constants.BE_VCPUS],
4682                      mem_size=self.be_full[constants.BE_MEMORY],
4683                      disks=self.disks,
4684                      nics=nics,
4685                      hypervisor=self.op.hypervisor,
4686                      )
4687
4688     ial.Run(self.op.iallocator)
4689
4690     if not ial.success:
4691       raise errors.OpPrereqError("Can't compute nodes using"
4692                                  " iallocator '%s': %s" % (self.op.iallocator,
4693                                                            ial.info))
4694     if len(ial.nodes) != ial.required_nodes:
4695       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4696                                  " of nodes (%s), required %s" %
4697                                  (self.op.iallocator, len(ial.nodes),
4698                                   ial.required_nodes))
4699     self.op.pnode = ial.nodes[0]
4700     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4701                  self.op.instance_name, self.op.iallocator,
4702                  ", ".join(ial.nodes))
4703     if ial.required_nodes == 2:
4704       self.op.snode = ial.nodes[1]
4705
4706   def BuildHooksEnv(self):
4707     """Build hooks env.
4708
4709     This runs on master, primary and secondary nodes of the instance.
4710
4711     """
4712     env = {
4713       "ADD_MODE": self.op.mode,
4714       }
4715     if self.op.mode == constants.INSTANCE_IMPORT:
4716       env["SRC_NODE"] = self.op.src_node
4717       env["SRC_PATH"] = self.op.src_path
4718       env["SRC_IMAGES"] = self.src_images
4719
4720     env.update(_BuildInstanceHookEnv(
4721       name=self.op.instance_name,
4722       primary_node=self.op.pnode,
4723       secondary_nodes=self.secondaries,
4724       status=self.op.start,
4725       os_type=self.op.os_type,
4726       memory=self.be_full[constants.BE_MEMORY],
4727       vcpus=self.be_full[constants.BE_VCPUS],
4728       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4729       disk_template=self.op.disk_template,
4730       disks=[(d["size"], d["mode"]) for d in self.disks],
4731       bep=self.be_full,
4732       hvp=self.hv_full,
4733       hypervisor_name=self.op.hypervisor,
4734     ))
4735
4736     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4737           self.secondaries)
4738     return env, nl, nl
4739
4740
4741   def CheckPrereq(self):
4742     """Check prerequisites.
4743
4744     """
4745     if (not self.cfg.GetVGName() and
4746         self.op.disk_template not in constants.DTS_NOT_LVM):
4747       raise errors.OpPrereqError("Cluster does not support lvm-based"
4748                                  " instances")
4749
4750     if self.op.mode == constants.INSTANCE_IMPORT:
4751       src_node = self.op.src_node
4752       src_path = self.op.src_path
4753
4754       if src_node is None:
4755         exp_list = self.rpc.call_export_list(
4756           self.acquired_locks[locking.LEVEL_NODE])
4757         found = False
4758         for node in exp_list:
4759           if not exp_list[node].failed and src_path in exp_list[node].data:
4760             found = True
4761             self.op.src_node = src_node = node
4762             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4763                                                        src_path)
4764             break
4765         if not found:
4766           raise errors.OpPrereqError("No export found for relative path %s" %
4767                                       src_path)
4768
4769       _CheckNodeOnline(self, src_node)
4770       result = self.rpc.call_export_info(src_node, src_path)
4771       result.Raise()
4772       if not result.data:
4773         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4774
4775       export_info = result.data
4776       if not export_info.has_section(constants.INISECT_EXP):
4777         raise errors.ProgrammerError("Corrupted export config")
4778
4779       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4780       if (int(ei_version) != constants.EXPORT_VERSION):
4781         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4782                                    (ei_version, constants.EXPORT_VERSION))
4783
4784       # Check that the new instance doesn't have less disks than the export
4785       instance_disks = len(self.disks)
4786       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4787       if instance_disks < export_disks:
4788         raise errors.OpPrereqError("Not enough disks to import."
4789                                    " (instance: %d, export: %d)" %
4790                                    (instance_disks, export_disks))
4791
4792       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4793       disk_images = []
4794       for idx in range(export_disks):
4795         option = 'disk%d_dump' % idx
4796         if export_info.has_option(constants.INISECT_INS, option):
4797           # FIXME: are the old os-es, disk sizes, etc. useful?
4798           export_name = export_info.get(constants.INISECT_INS, option)
4799           image = os.path.join(src_path, export_name)
4800           disk_images.append(image)
4801         else:
4802           disk_images.append(False)
4803
4804       self.src_images = disk_images
4805
4806       old_name = export_info.get(constants.INISECT_INS, 'name')
4807       # FIXME: int() here could throw a ValueError on broken exports
4808       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4809       if self.op.instance_name == old_name:
4810         for idx, nic in enumerate(self.nics):
4811           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4812             nic_mac_ini = 'nic%d_mac' % idx
4813             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4814
4815     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4816     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4817     if self.op.start and not self.op.ip_check:
4818       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4819                                  " adding an instance in start mode")
4820
4821     if self.op.ip_check:
4822       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4823         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4824                                    (self.check_ip, self.op.instance_name))
4825
4826     #### mac address generation
4827     # By generating here the mac address both the allocator and the hooks get
4828     # the real final mac address rather than the 'auto' or 'generate' value.
4829     # There is a race condition between the generation and the instance object
4830     # creation, which means that we know the mac is valid now, but we're not
4831     # sure it will be when we actually add the instance. If things go bad
4832     # adding the instance will abort because of a duplicate mac, and the
4833     # creation job will fail.
4834     for nic in self.nics:
4835       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4836         nic.mac = self.cfg.GenerateMAC()
4837
4838     #### allocator run
4839
4840     if self.op.iallocator is not None:
4841       self._RunAllocator()
4842
4843     #### node related checks
4844
4845     # check primary node
4846     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4847     assert self.pnode is not None, \
4848       "Cannot retrieve locked node %s" % self.op.pnode
4849     if pnode.offline:
4850       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4851                                  pnode.name)
4852     if pnode.drained:
4853       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4854                                  pnode.name)
4855
4856     self.secondaries = []
4857
4858     # mirror node verification
4859     if self.op.disk_template in constants.DTS_NET_MIRROR:
4860       if self.op.snode is None:
4861         raise errors.OpPrereqError("The networked disk templates need"
4862                                    " a mirror node")
4863       if self.op.snode == pnode.name:
4864         raise errors.OpPrereqError("The secondary node cannot be"
4865                                    " the primary node.")
4866       _CheckNodeOnline(self, self.op.snode)
4867       _CheckNodeNotDrained(self, self.op.snode)
4868       self.secondaries.append(self.op.snode)
4869
4870     nodenames = [pnode.name] + self.secondaries
4871
4872     req_size = _ComputeDiskSize(self.op.disk_template,
4873                                 self.disks)
4874
4875     # Check lv size requirements
4876     if req_size is not None:
4877       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4878                                          self.op.hypervisor)
4879       for node in nodenames:
4880         info = nodeinfo[node]
4881         info.Raise()
4882         info = info.data
4883         if not info:
4884           raise errors.OpPrereqError("Cannot get current information"
4885                                      " from node '%s'" % node)
4886         vg_free = info.get('vg_free', None)
4887         if not isinstance(vg_free, int):
4888           raise errors.OpPrereqError("Can't compute free disk space on"
4889                                      " node %s" % node)
4890         if req_size > info['vg_free']:
4891           raise errors.OpPrereqError("Not enough disk space on target node %s."
4892                                      " %d MB available, %d MB required" %
4893                                      (node, info['vg_free'], req_size))
4894
4895     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4896
4897     # os verification
4898     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4899     result.Raise()
4900     if not isinstance(result.data, objects.OS) or not result.data:
4901       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4902                                  " primary node"  % self.op.os_type)
4903
4904     # bridge check on primary node
4905     bridges = [n.bridge for n in self.nics]
4906     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4907     result.Raise()
4908     if not result.data:
4909       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4910                                  " exist on destination node '%s'" %
4911                                  (",".join(bridges), pnode.name))
4912
4913     # memory check on primary node
4914     if self.op.start:
4915       _CheckNodeFreeMemory(self, self.pnode.name,
4916                            "creating instance %s" % self.op.instance_name,
4917                            self.be_full[constants.BE_MEMORY],
4918                            self.op.hypervisor)
4919
4920   def Exec(self, feedback_fn):
4921     """Create and add the instance to the cluster.
4922
4923     """
4924     instance = self.op.instance_name
4925     pnode_name = self.pnode.name
4926
4927     ht_kind = self.op.hypervisor
4928     if ht_kind in constants.HTS_REQ_PORT:
4929       network_port = self.cfg.AllocatePort()
4930     else:
4931       network_port = None
4932
4933     ##if self.op.vnc_bind_address is None:
4934     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4935
4936     # this is needed because os.path.join does not accept None arguments
4937     if self.op.file_storage_dir is None:
4938       string_file_storage_dir = ""
4939     else:
4940       string_file_storage_dir = self.op.file_storage_dir
4941
4942     # build the full file storage dir path
4943     file_storage_dir = os.path.normpath(os.path.join(
4944                                         self.cfg.GetFileStorageDir(),
4945                                         string_file_storage_dir, instance))
4946
4947
4948     disks = _GenerateDiskTemplate(self,
4949                                   self.op.disk_template,
4950                                   instance, pnode_name,
4951                                   self.secondaries,
4952                                   self.disks,
4953                                   file_storage_dir,
4954                                   self.op.file_driver,
4955                                   0)
4956
4957     iobj = objects.Instance(name=instance, os=self.op.os_type,
4958                             primary_node=pnode_name,
4959                             nics=self.nics, disks=disks,
4960                             disk_template=self.op.disk_template,
4961                             admin_up=False,
4962                             network_port=network_port,
4963                             beparams=self.op.beparams,
4964                             hvparams=self.op.hvparams,
4965                             hypervisor=self.op.hypervisor,
4966                             )
4967
4968     feedback_fn("* creating instance disks...")
4969     try:
4970       _CreateDisks(self, iobj)
4971     except errors.OpExecError:
4972       self.LogWarning("Device creation failed, reverting...")
4973       try:
4974         _RemoveDisks(self, iobj)
4975       finally:
4976         self.cfg.ReleaseDRBDMinors(instance)
4977         raise
4978
4979     feedback_fn("adding instance %s to cluster config" % instance)
4980
4981     self.cfg.AddInstance(iobj)
4982     # Declare that we don't want to remove the instance lock anymore, as we've
4983     # added the instance to the config
4984     del self.remove_locks[locking.LEVEL_INSTANCE]
4985     # Unlock all the nodes
4986     if self.op.mode == constants.INSTANCE_IMPORT:
4987       nodes_keep = [self.op.src_node]
4988       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4989                        if node != self.op.src_node]
4990       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4991       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4992     else:
4993       self.context.glm.release(locking.LEVEL_NODE)
4994       del self.acquired_locks[locking.LEVEL_NODE]
4995
4996     if self.op.wait_for_sync:
4997       disk_abort = not _WaitForSync(self, iobj)
4998     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4999       # make sure the disks are not degraded (still sync-ing is ok)
5000       time.sleep(15)
5001       feedback_fn("* checking mirrors status")
5002       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5003     else:
5004       disk_abort = False
5005
5006     if disk_abort:
5007       _RemoveDisks(self, iobj)
5008       self.cfg.RemoveInstance(iobj.name)
5009       # Make sure the instance lock gets removed
5010       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5011       raise errors.OpExecError("There are some degraded disks for"
5012                                " this instance")
5013
5014     feedback_fn("creating os for instance %s on node %s" %
5015                 (instance, pnode_name))
5016
5017     if iobj.disk_template != constants.DT_DISKLESS:
5018       if self.op.mode == constants.INSTANCE_CREATE:
5019         feedback_fn("* running the instance OS create scripts...")
5020         result = self.rpc.call_instance_os_add(pnode_name, iobj)
5021         msg = result.RemoteFailMsg()
5022         if msg:
5023           raise errors.OpExecError("Could not add os for instance %s"
5024                                    " on node %s: %s" %
5025                                    (instance, pnode_name, msg))
5026
5027       elif self.op.mode == constants.INSTANCE_IMPORT:
5028         feedback_fn("* running the instance OS import scripts...")
5029         src_node = self.op.src_node
5030         src_images = self.src_images
5031         cluster_name = self.cfg.GetClusterName()
5032         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5033                                                          src_node, src_images,
5034                                                          cluster_name)
5035         import_result.Raise()
5036         for idx, result in enumerate(import_result.data):
5037           if not result:
5038             self.LogWarning("Could not import the image %s for instance"
5039                             " %s, disk %d, on node %s" %
5040                             (src_images[idx], instance, idx, pnode_name))
5041       else:
5042         # also checked in the prereq part
5043         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5044                                      % self.op.mode)
5045
5046     if self.op.start:
5047       iobj.admin_up = True
5048       self.cfg.Update(iobj)
5049       logging.info("Starting instance %s on node %s", instance, pnode_name)
5050       feedback_fn("* starting instance...")
5051       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5052       msg = result.RemoteFailMsg()
5053       if msg:
5054         raise errors.OpExecError("Could not start instance: %s" % msg)
5055
5056
5057 class LUConnectConsole(NoHooksLU):
5058   """Connect to an instance's console.
5059
5060   This is somewhat special in that it returns the command line that
5061   you need to run on the master node in order to connect to the
5062   console.
5063
5064   """
5065   _OP_REQP = ["instance_name"]
5066   REQ_BGL = False
5067
5068   def ExpandNames(self):
5069     self._ExpandAndLockInstance()
5070
5071   def CheckPrereq(self):
5072     """Check prerequisites.
5073
5074     This checks that the instance is in the cluster.
5075
5076     """
5077     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5078     assert self.instance is not None, \
5079       "Cannot retrieve locked instance %s" % self.op.instance_name
5080     _CheckNodeOnline(self, self.instance.primary_node)
5081
5082   def Exec(self, feedback_fn):
5083     """Connect to the console of an instance
5084
5085     """
5086     instance = self.instance
5087     node = instance.primary_node
5088
5089     node_insts = self.rpc.call_instance_list([node],
5090                                              [instance.hypervisor])[node]
5091     node_insts.Raise()
5092
5093     if instance.name not in node_insts.data:
5094       raise errors.OpExecError("Instance %s is not running." % instance.name)
5095
5096     logging.debug("Connecting to console of %s on %s", instance.name, node)
5097
5098     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5099     cluster = self.cfg.GetClusterInfo()
5100     # beparams and hvparams are passed separately, to avoid editing the
5101     # instance and then saving the defaults in the instance itself.
5102     hvparams = cluster.FillHV(instance)
5103     beparams = cluster.FillBE(instance)
5104     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5105
5106     # build ssh cmdline
5107     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5108
5109
5110 class LUReplaceDisks(LogicalUnit):
5111   """Replace the disks of an instance.
5112
5113   """
5114   HPATH = "mirrors-replace"
5115   HTYPE = constants.HTYPE_INSTANCE
5116   _OP_REQP = ["instance_name", "mode", "disks"]
5117   REQ_BGL = False
5118
5119   def CheckArguments(self):
5120     if not hasattr(self.op, "remote_node"):
5121       self.op.remote_node = None
5122     if not hasattr(self.op, "iallocator"):
5123       self.op.iallocator = None
5124
5125     # check for valid parameter combination
5126     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5127     if self.op.mode == constants.REPLACE_DISK_CHG:
5128       if cnt == 2:
5129         raise errors.OpPrereqError("When changing the secondary either an"
5130                                    " iallocator script must be used or the"
5131                                    " new node given")
5132       elif cnt == 0:
5133         raise errors.OpPrereqError("Give either the iallocator or the new"
5134                                    " secondary, not both")
5135     else: # not replacing the secondary
5136       if cnt != 2:
5137         raise errors.OpPrereqError("The iallocator and new node options can"
5138                                    " be used only when changing the"
5139                                    " secondary node")
5140
5141   def ExpandNames(self):
5142     self._ExpandAndLockInstance()
5143
5144     if self.op.iallocator is not None:
5145       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5146     elif self.op.remote_node is not None:
5147       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5148       if remote_node is None:
5149         raise errors.OpPrereqError("Node '%s' not known" %
5150                                    self.op.remote_node)
5151       self.op.remote_node = remote_node
5152       # Warning: do not remove the locking of the new secondary here
5153       # unless DRBD8.AddChildren is changed to work in parallel;
5154       # currently it doesn't since parallel invocations of
5155       # FindUnusedMinor will conflict
5156       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5157       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5158     else:
5159       self.needed_locks[locking.LEVEL_NODE] = []
5160       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5161
5162   def DeclareLocks(self, level):
5163     # If we're not already locking all nodes in the set we have to declare the
5164     # instance's primary/secondary nodes.
5165     if (level == locking.LEVEL_NODE and
5166         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5167       self._LockInstancesNodes()
5168
5169   def _RunAllocator(self):
5170     """Compute a new secondary node using an IAllocator.
5171
5172     """
5173     ial = IAllocator(self,
5174                      mode=constants.IALLOCATOR_MODE_RELOC,
5175                      name=self.op.instance_name,
5176                      relocate_from=[self.sec_node])
5177
5178     ial.Run(self.op.iallocator)
5179
5180     if not ial.success:
5181       raise errors.OpPrereqError("Can't compute nodes using"
5182                                  " iallocator '%s': %s" % (self.op.iallocator,
5183                                                            ial.info))
5184     if len(ial.nodes) != ial.required_nodes:
5185       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5186                                  " of nodes (%s), required %s" %
5187                                  (len(ial.nodes), ial.required_nodes))
5188     self.op.remote_node = ial.nodes[0]
5189     self.LogInfo("Selected new secondary for the instance: %s",
5190                  self.op.remote_node)
5191
5192   def BuildHooksEnv(self):
5193     """Build hooks env.
5194
5195     This runs on the master, the primary and all the secondaries.
5196
5197     """
5198     env = {
5199       "MODE": self.op.mode,
5200       "NEW_SECONDARY": self.op.remote_node,
5201       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5202       }
5203     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5204     nl = [
5205       self.cfg.GetMasterNode(),
5206       self.instance.primary_node,
5207       ]
5208     if self.op.remote_node is not None:
5209       nl.append(self.op.remote_node)
5210     return env, nl, nl
5211
5212   def CheckPrereq(self):
5213     """Check prerequisites.
5214
5215     This checks that the instance is in the cluster.
5216
5217     """
5218     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5219     assert instance is not None, \
5220       "Cannot retrieve locked instance %s" % self.op.instance_name
5221     self.instance = instance
5222
5223     if instance.disk_template != constants.DT_DRBD8:
5224       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5225                                  " instances")
5226
5227     if len(instance.secondary_nodes) != 1:
5228       raise errors.OpPrereqError("The instance has a strange layout,"
5229                                  " expected one secondary but found %d" %
5230                                  len(instance.secondary_nodes))
5231
5232     self.sec_node = instance.secondary_nodes[0]
5233
5234     if self.op.iallocator is not None:
5235       self._RunAllocator()
5236
5237     remote_node = self.op.remote_node
5238     if remote_node is not None:
5239       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5240       assert self.remote_node_info is not None, \
5241         "Cannot retrieve locked node %s" % remote_node
5242     else:
5243       self.remote_node_info = None
5244     if remote_node == instance.primary_node:
5245       raise errors.OpPrereqError("The specified node is the primary node of"
5246                                  " the instance.")
5247     elif remote_node == self.sec_node:
5248       raise errors.OpPrereqError("The specified node is already the"
5249                                  " secondary node of the instance.")
5250
5251     if self.op.mode == constants.REPLACE_DISK_PRI:
5252       n1 = self.tgt_node = instance.primary_node
5253       n2 = self.oth_node = self.sec_node
5254     elif self.op.mode == constants.REPLACE_DISK_SEC:
5255       n1 = self.tgt_node = self.sec_node
5256       n2 = self.oth_node = instance.primary_node
5257     elif self.op.mode == constants.REPLACE_DISK_CHG:
5258       n1 = self.new_node = remote_node
5259       n2 = self.oth_node = instance.primary_node
5260       self.tgt_node = self.sec_node
5261       _CheckNodeNotDrained(self, remote_node)
5262     else:
5263       raise errors.ProgrammerError("Unhandled disk replace mode")
5264
5265     _CheckNodeOnline(self, n1)
5266     _CheckNodeOnline(self, n2)
5267
5268     if not self.op.disks:
5269       self.op.disks = range(len(instance.disks))
5270
5271     for disk_idx in self.op.disks:
5272       instance.FindDisk(disk_idx)
5273
5274   def _ExecD8DiskOnly(self, feedback_fn):
5275     """Replace a disk on the primary or secondary for dbrd8.
5276
5277     The algorithm for replace is quite complicated:
5278
5279       1. for each disk to be replaced:
5280
5281         1. create new LVs on the target node with unique names
5282         1. detach old LVs from the drbd device
5283         1. rename old LVs to name_replaced.<time_t>
5284         1. rename new LVs to old LVs
5285         1. attach the new LVs (with the old names now) to the drbd device
5286
5287       1. wait for sync across all devices
5288
5289       1. for each modified disk:
5290
5291         1. remove old LVs (which have the name name_replaces.<time_t>)
5292
5293     Failures are not very well handled.
5294
5295     """
5296     steps_total = 6
5297     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5298     instance = self.instance
5299     iv_names = {}
5300     vgname = self.cfg.GetVGName()
5301     # start of work
5302     cfg = self.cfg
5303     tgt_node = self.tgt_node
5304     oth_node = self.oth_node
5305
5306     # Step: check device activation
5307     self.proc.LogStep(1, steps_total, "check device existence")
5308     info("checking volume groups")
5309     my_vg = cfg.GetVGName()
5310     results = self.rpc.call_vg_list([oth_node, tgt_node])
5311     if not results:
5312       raise errors.OpExecError("Can't list volume groups on the nodes")
5313     for node in oth_node, tgt_node:
5314       res = results[node]
5315       if res.failed or not res.data or my_vg not in res.data:
5316         raise errors.OpExecError("Volume group '%s' not found on %s" %
5317                                  (my_vg, node))
5318     for idx, dev in enumerate(instance.disks):
5319       if idx not in self.op.disks:
5320         continue
5321       for node in tgt_node, oth_node:
5322         info("checking disk/%d on %s" % (idx, node))
5323         cfg.SetDiskID(dev, node)
5324         result = self.rpc.call_blockdev_find(node, dev)
5325         msg = result.RemoteFailMsg()
5326         if not msg and not result.payload:
5327           msg = "disk not found"
5328         if msg:
5329           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5330                                    (idx, node, msg))
5331
5332     # Step: check other node consistency
5333     self.proc.LogStep(2, steps_total, "check peer consistency")
5334     for idx, dev in enumerate(instance.disks):
5335       if idx not in self.op.disks:
5336         continue
5337       info("checking disk/%d consistency on %s" % (idx, oth_node))
5338       if not _CheckDiskConsistency(self, dev, oth_node,
5339                                    oth_node==instance.primary_node):
5340         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5341                                  " to replace disks on this node (%s)" %
5342                                  (oth_node, tgt_node))
5343
5344     # Step: create new storage
5345     self.proc.LogStep(3, steps_total, "allocate new storage")
5346     for idx, dev in enumerate(instance.disks):
5347       if idx not in self.op.disks:
5348         continue
5349       size = dev.size
5350       cfg.SetDiskID(dev, tgt_node)
5351       lv_names = [".disk%d_%s" % (idx, suf)
5352                   for suf in ["data", "meta"]]
5353       names = _GenerateUniqueNames(self, lv_names)
5354       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5355                              logical_id=(vgname, names[0]))
5356       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5357                              logical_id=(vgname, names[1]))
5358       new_lvs = [lv_data, lv_meta]
5359       old_lvs = dev.children
5360       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5361       info("creating new local storage on %s for %s" %
5362            (tgt_node, dev.iv_name))
5363       # we pass force_create=True to force the LVM creation
5364       for new_lv in new_lvs:
5365         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5366                         _GetInstanceInfoText(instance), False)
5367
5368     # Step: for each lv, detach+rename*2+attach
5369     self.proc.LogStep(4, steps_total, "change drbd configuration")
5370     for dev, old_lvs, new_lvs in iv_names.itervalues():
5371       info("detaching %s drbd from local storage" % dev.iv_name)
5372       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5373       result.Raise()
5374       if not result.data:
5375         raise errors.OpExecError("Can't detach drbd from local storage on node"
5376                                  " %s for device %s" % (tgt_node, dev.iv_name))
5377       #dev.children = []
5378       #cfg.Update(instance)
5379
5380       # ok, we created the new LVs, so now we know we have the needed
5381       # storage; as such, we proceed on the target node to rename
5382       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5383       # using the assumption that logical_id == physical_id (which in
5384       # turn is the unique_id on that node)
5385
5386       # FIXME(iustin): use a better name for the replaced LVs
5387       temp_suffix = int(time.time())
5388       ren_fn = lambda d, suff: (d.physical_id[0],
5389                                 d.physical_id[1] + "_replaced-%s" % suff)
5390       # build the rename list based on what LVs exist on the node
5391       rlist = []
5392       for to_ren in old_lvs:
5393         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5394         if not result.RemoteFailMsg() and result.payload:
5395           # device exists
5396           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5397
5398       info("renaming the old LVs on the target node")
5399       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5400       result.Raise()
5401       if not result.data:
5402         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5403       # now we rename the new LVs to the old LVs
5404       info("renaming the new LVs on the target node")
5405       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5406       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5407       result.Raise()
5408       if not result.data:
5409         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5410
5411       for old, new in zip(old_lvs, new_lvs):
5412         new.logical_id = old.logical_id
5413         cfg.SetDiskID(new, tgt_node)
5414
5415       for disk in old_lvs:
5416         disk.logical_id = ren_fn(disk, temp_suffix)
5417         cfg.SetDiskID(disk, tgt_node)
5418
5419       # now that the new lvs have the old name, we can add them to the device
5420       info("adding new mirror component on %s" % tgt_node)
5421       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5422       if result.failed or not result.data:
5423         for new_lv in new_lvs:
5424           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5425           if msg:
5426             warning("Can't rollback device %s: %s", dev, msg,
5427                     hint="cleanup manually the unused logical volumes")
5428         raise errors.OpExecError("Can't add local storage to drbd")
5429
5430       dev.children = new_lvs
5431       cfg.Update(instance)
5432
5433     # Step: wait for sync
5434
5435     # this can fail as the old devices are degraded and _WaitForSync
5436     # does a combined result over all disks, so we don't check its
5437     # return value
5438     self.proc.LogStep(5, steps_total, "sync devices")
5439     _WaitForSync(self, instance, unlock=True)
5440
5441     # so check manually all the devices
5442     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5443       cfg.SetDiskID(dev, instance.primary_node)
5444       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5445       msg = result.RemoteFailMsg()
5446       if not msg and not result.payload:
5447         msg = "disk not found"
5448       if msg:
5449         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5450                                  (name, msg))
5451       if result.payload[5]:
5452         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5453
5454     # Step: remove old storage
5455     self.proc.LogStep(6, steps_total, "removing old storage")
5456     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5457       info("remove logical volumes for %s" % name)
5458       for lv in old_lvs:
5459         cfg.SetDiskID(lv, tgt_node)
5460         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5461         if msg:
5462           warning("Can't remove old LV: %s" % msg,
5463                   hint="manually remove unused LVs")
5464           continue
5465
5466   def _ExecD8Secondary(self, feedback_fn):
5467     """Replace the secondary node for drbd8.
5468
5469     The algorithm for replace is quite complicated:
5470       - for all disks of the instance:
5471         - create new LVs on the new node with same names
5472         - shutdown the drbd device on the old secondary
5473         - disconnect the drbd network on the primary
5474         - create the drbd device on the new secondary
5475         - network attach the drbd on the primary, using an artifice:
5476           the drbd code for Attach() will connect to the network if it
5477           finds a device which is connected to the good local disks but
5478           not network enabled
5479       - wait for sync across all devices
5480       - remove all disks from the old secondary
5481
5482     Failures are not very well handled.
5483
5484     """
5485     steps_total = 6
5486     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5487     instance = self.instance
5488     iv_names = {}
5489     # start of work
5490     cfg = self.cfg
5491     old_node = self.tgt_node
5492     new_node = self.new_node
5493     pri_node = instance.primary_node
5494     nodes_ip = {
5495       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5496       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5497       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5498       }
5499
5500     # Step: check device activation
5501     self.proc.LogStep(1, steps_total, "check device existence")
5502     info("checking volume groups")
5503     my_vg = cfg.GetVGName()
5504     results = self.rpc.call_vg_list([pri_node, new_node])
5505     for node in pri_node, new_node:
5506       res = results[node]
5507       if res.failed or not res.data or my_vg not in res.data:
5508         raise errors.OpExecError("Volume group '%s' not found on %s" %
5509                                  (my_vg, node))
5510     for idx, dev in enumerate(instance.disks):
5511       if idx not in self.op.disks:
5512         continue
5513       info("checking disk/%d on %s" % (idx, pri_node))
5514       cfg.SetDiskID(dev, pri_node)
5515       result = self.rpc.call_blockdev_find(pri_node, dev)
5516       msg = result.RemoteFailMsg()
5517       if not msg and not result.payload:
5518         msg = "disk not found"
5519       if msg:
5520         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5521                                  (idx, pri_node, msg))
5522
5523     # Step: check other node consistency
5524     self.proc.LogStep(2, steps_total, "check peer consistency")
5525     for idx, dev in enumerate(instance.disks):
5526       if idx not in self.op.disks:
5527         continue
5528       info("checking disk/%d consistency on %s" % (idx, pri_node))
5529       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5530         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5531                                  " unsafe to replace the secondary" %
5532                                  pri_node)
5533
5534     # Step: create new storage
5535     self.proc.LogStep(3, steps_total, "allocate new storage")
5536     for idx, dev in enumerate(instance.disks):
5537       info("adding new local storage on %s for disk/%d" %
5538            (new_node, idx))
5539       # we pass force_create=True to force LVM creation
5540       for new_lv in dev.children:
5541         _CreateBlockDev(self, new_node, instance, new_lv, True,
5542                         _GetInstanceInfoText(instance), False)
5543
5544     # Step 4: dbrd minors and drbd setups changes
5545     # after this, we must manually remove the drbd minors on both the
5546     # error and the success paths
5547     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5548                                    instance.name)
5549     logging.debug("Allocated minors %s" % (minors,))
5550     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5551     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5552       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5553       # create new devices on new_node; note that we create two IDs:
5554       # one without port, so the drbd will be activated without
5555       # networking information on the new node at this stage, and one
5556       # with network, for the latter activation in step 4
5557       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5558       if pri_node == o_node1:
5559         p_minor = o_minor1
5560       else:
5561         p_minor = o_minor2
5562
5563       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5564       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5565
5566       iv_names[idx] = (dev, dev.children, new_net_id)
5567       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5568                     new_net_id)
5569       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5570                               logical_id=new_alone_id,
5571                               children=dev.children,
5572                               size=dev.size)
5573       try:
5574         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5575                               _GetInstanceInfoText(instance), False)
5576       except errors.GenericError:
5577         self.cfg.ReleaseDRBDMinors(instance.name)
5578         raise
5579
5580     for idx, dev in enumerate(instance.disks):
5581       # we have new devices, shutdown the drbd on the old secondary
5582       info("shutting down drbd for disk/%d on old node" % idx)
5583       cfg.SetDiskID(dev, old_node)
5584       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5585       if msg:
5586         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5587                 (idx, msg),
5588                 hint="Please cleanup this device manually as soon as possible")
5589
5590     info("detaching primary drbds from the network (=> standalone)")
5591     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5592                                                instance.disks)[pri_node]
5593
5594     msg = result.RemoteFailMsg()
5595     if msg:
5596       # detaches didn't succeed (unlikely)
5597       self.cfg.ReleaseDRBDMinors(instance.name)
5598       raise errors.OpExecError("Can't detach the disks from the network on"
5599                                " old node: %s" % (msg,))
5600
5601     # if we managed to detach at least one, we update all the disks of
5602     # the instance to point to the new secondary
5603     info("updating instance configuration")
5604     for dev, _, new_logical_id in iv_names.itervalues():
5605       dev.logical_id = new_logical_id
5606       cfg.SetDiskID(dev, pri_node)
5607     cfg.Update(instance)
5608
5609     # and now perform the drbd attach
5610     info("attaching primary drbds to new secondary (standalone => connected)")
5611     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5612                                            instance.disks, instance.name,
5613                                            False)
5614     for to_node, to_result in result.items():
5615       msg = to_result.RemoteFailMsg()
5616       if msg:
5617         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5618                 hint="please do a gnt-instance info to see the"
5619                 " status of disks")
5620
5621     # this can fail as the old devices are degraded and _WaitForSync
5622     # does a combined result over all disks, so we don't check its
5623     # return value
5624     self.proc.LogStep(5, steps_total, "sync devices")
5625     _WaitForSync(self, instance, unlock=True)
5626
5627     # so check manually all the devices
5628     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5629       cfg.SetDiskID(dev, pri_node)
5630       result = self.rpc.call_blockdev_find(pri_node, dev)
5631       msg = result.RemoteFailMsg()
5632       if not msg and not result.payload:
5633         msg = "disk not found"
5634       if msg:
5635         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5636                                  (idx, msg))
5637       if result.payload[5]:
5638         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5639
5640     self.proc.LogStep(6, steps_total, "removing old storage")
5641     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5642       info("remove logical volumes for disk/%d" % idx)
5643       for lv in old_lvs:
5644         cfg.SetDiskID(lv, old_node)
5645         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5646         if msg:
5647           warning("Can't remove LV on old secondary: %s", msg,
5648                   hint="Cleanup stale volumes by hand")
5649
5650   def Exec(self, feedback_fn):
5651     """Execute disk replacement.
5652
5653     This dispatches the disk replacement to the appropriate handler.
5654
5655     """
5656     instance = self.instance
5657
5658     # Activate the instance disks if we're replacing them on a down instance
5659     if not instance.admin_up:
5660       _StartInstanceDisks(self, instance, True)
5661
5662     if self.op.mode == constants.REPLACE_DISK_CHG:
5663       fn = self._ExecD8Secondary
5664     else:
5665       fn = self._ExecD8DiskOnly
5666
5667     ret = fn(feedback_fn)
5668
5669     # Deactivate the instance disks if we're replacing them on a down instance
5670     if not instance.admin_up:
5671       _SafeShutdownInstanceDisks(self, instance)
5672
5673     return ret
5674
5675
5676 class LUGrowDisk(LogicalUnit):
5677   """Grow a disk of an instance.
5678
5679   """
5680   HPATH = "disk-grow"
5681   HTYPE = constants.HTYPE_INSTANCE
5682   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5683   REQ_BGL = False
5684
5685   def ExpandNames(self):
5686     self._ExpandAndLockInstance()
5687     self.needed_locks[locking.LEVEL_NODE] = []
5688     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5689
5690   def DeclareLocks(self, level):
5691     if level == locking.LEVEL_NODE:
5692       self._LockInstancesNodes()
5693
5694   def BuildHooksEnv(self):
5695     """Build hooks env.
5696
5697     This runs on the master, the primary and all the secondaries.
5698
5699     """
5700     env = {
5701       "DISK": self.op.disk,
5702       "AMOUNT": self.op.amount,
5703       }
5704     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5705     nl = [
5706       self.cfg.GetMasterNode(),
5707       self.instance.primary_node,
5708       ]
5709     return env, nl, nl
5710
5711   def CheckPrereq(self):
5712     """Check prerequisites.
5713
5714     This checks that the instance is in the cluster.
5715
5716     """
5717     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5718     assert instance is not None, \
5719       "Cannot retrieve locked instance %s" % self.op.instance_name
5720     nodenames = list(instance.all_nodes)
5721     for node in nodenames:
5722       _CheckNodeOnline(self, node)
5723
5724
5725     self.instance = instance
5726
5727     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5728       raise errors.OpPrereqError("Instance's disk layout does not support"
5729                                  " growing.")
5730
5731     self.disk = instance.FindDisk(self.op.disk)
5732
5733     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5734                                        instance.hypervisor)
5735     for node in nodenames:
5736       info = nodeinfo[node]
5737       if info.failed or not info.data:
5738         raise errors.OpPrereqError("Cannot get current information"
5739                                    " from node '%s'" % node)
5740       vg_free = info.data.get('vg_free', None)
5741       if not isinstance(vg_free, int):
5742         raise errors.OpPrereqError("Can't compute free disk space on"
5743                                    " node %s" % node)
5744       if self.op.amount > vg_free:
5745         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5746                                    " %d MiB available, %d MiB required" %
5747                                    (node, vg_free, self.op.amount))
5748
5749   def Exec(self, feedback_fn):
5750     """Execute disk grow.
5751
5752     """
5753     instance = self.instance
5754     disk = self.disk
5755     for node in instance.all_nodes:
5756       self.cfg.SetDiskID(disk, node)
5757       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5758       msg = result.RemoteFailMsg()
5759       if msg:
5760         raise errors.OpExecError("Grow request failed to node %s: %s" %
5761                                  (node, msg))
5762     disk.RecordGrow(self.op.amount)
5763     self.cfg.Update(instance)
5764     if self.op.wait_for_sync:
5765       disk_abort = not _WaitForSync(self, instance)
5766       if disk_abort:
5767         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5768                              " status.\nPlease check the instance.")
5769
5770
5771 class LUQueryInstanceData(NoHooksLU):
5772   """Query runtime instance data.
5773
5774   """
5775   _OP_REQP = ["instances", "static"]
5776   REQ_BGL = False
5777
5778   def ExpandNames(self):
5779     self.needed_locks = {}
5780     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5781
5782     if not isinstance(self.op.instances, list):
5783       raise errors.OpPrereqError("Invalid argument type 'instances'")
5784
5785     if self.op.instances:
5786       self.wanted_names = []
5787       for name in self.op.instances:
5788         full_name = self.cfg.ExpandInstanceName(name)
5789         if full_name is None:
5790           raise errors.OpPrereqError("Instance '%s' not known" % name)
5791         self.wanted_names.append(full_name)
5792       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5793     else:
5794       self.wanted_names = None
5795       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5796
5797     self.needed_locks[locking.LEVEL_NODE] = []
5798     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5799
5800   def DeclareLocks(self, level):
5801     if level == locking.LEVEL_NODE:
5802       self._LockInstancesNodes()
5803
5804   def CheckPrereq(self):
5805     """Check prerequisites.
5806
5807     This only checks the optional instance list against the existing names.
5808
5809     """
5810     if self.wanted_names is None:
5811       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5812
5813     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5814                              in self.wanted_names]
5815     return
5816
5817   def _ComputeDiskStatus(self, instance, snode, dev):
5818     """Compute block device status.
5819
5820     """
5821     static = self.op.static
5822     if not static:
5823       self.cfg.SetDiskID(dev, instance.primary_node)
5824       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5825       if dev_pstatus.offline:
5826         dev_pstatus = None
5827       else:
5828         msg = dev_pstatus.RemoteFailMsg()
5829         if msg:
5830           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5831                                    (instance.name, msg))
5832         dev_pstatus = dev_pstatus.payload
5833     else:
5834       dev_pstatus = None
5835
5836     if dev.dev_type in constants.LDS_DRBD:
5837       # we change the snode then (otherwise we use the one passed in)
5838       if dev.logical_id[0] == instance.primary_node:
5839         snode = dev.logical_id[1]
5840       else:
5841         snode = dev.logical_id[0]
5842
5843     if snode and not static:
5844       self.cfg.SetDiskID(dev, snode)
5845       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5846       if dev_sstatus.offline:
5847         dev_sstatus = None
5848       else:
5849         msg = dev_sstatus.RemoteFailMsg()
5850         if msg:
5851           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5852                                    (instance.name, msg))
5853         dev_sstatus = dev_sstatus.payload
5854     else:
5855       dev_sstatus = None
5856
5857     if dev.children:
5858       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5859                       for child in dev.children]
5860     else:
5861       dev_children = []
5862
5863     data = {
5864       "iv_name": dev.iv_name,
5865       "dev_type": dev.dev_type,
5866       "logical_id": dev.logical_id,
5867       "physical_id": dev.physical_id,
5868       "pstatus": dev_pstatus,
5869       "sstatus": dev_sstatus,
5870       "children": dev_children,
5871       "mode": dev.mode,
5872       "size": dev.size,
5873       }
5874
5875     return data
5876
5877   def Exec(self, feedback_fn):
5878     """Gather and return data"""
5879     result = {}
5880
5881     cluster = self.cfg.GetClusterInfo()
5882
5883     for instance in self.wanted_instances:
5884       if not self.op.static:
5885         remote_info = self.rpc.call_instance_info(instance.primary_node,
5886                                                   instance.name,
5887                                                   instance.hypervisor)
5888         remote_info.Raise()
5889         remote_info = remote_info.data
5890         if remote_info and "state" in remote_info:
5891           remote_state = "up"
5892         else:
5893           remote_state = "down"
5894       else:
5895         remote_state = None
5896       if instance.admin_up:
5897         config_state = "up"
5898       else:
5899         config_state = "down"
5900
5901       disks = [self._ComputeDiskStatus(instance, None, device)
5902                for device in instance.disks]
5903
5904       idict = {
5905         "name": instance.name,
5906         "config_state": config_state,
5907         "run_state": remote_state,
5908         "pnode": instance.primary_node,
5909         "snodes": instance.secondary_nodes,
5910         "os": instance.os,
5911         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5912         "disks": disks,
5913         "hypervisor": instance.hypervisor,
5914         "network_port": instance.network_port,
5915         "hv_instance": instance.hvparams,
5916         "hv_actual": cluster.FillHV(instance),
5917         "be_instance": instance.beparams,
5918         "be_actual": cluster.FillBE(instance),
5919         }
5920
5921       result[instance.name] = idict
5922
5923     return result
5924
5925
5926 class LUSetInstanceParams(LogicalUnit):
5927   """Modifies an instances's parameters.
5928
5929   """
5930   HPATH = "instance-modify"
5931   HTYPE = constants.HTYPE_INSTANCE
5932   _OP_REQP = ["instance_name"]
5933   REQ_BGL = False
5934
5935   def CheckArguments(self):
5936     if not hasattr(self.op, 'nics'):
5937       self.op.nics = []
5938     if not hasattr(self.op, 'disks'):
5939       self.op.disks = []
5940     if not hasattr(self.op, 'beparams'):
5941       self.op.beparams = {}
5942     if not hasattr(self.op, 'hvparams'):
5943       self.op.hvparams = {}
5944     self.op.force = getattr(self.op, "force", False)
5945     if not (self.op.nics or self.op.disks or
5946             self.op.hvparams or self.op.beparams):
5947       raise errors.OpPrereqError("No changes submitted")
5948
5949     # Disk validation
5950     disk_addremove = 0
5951     for disk_op, disk_dict in self.op.disks:
5952       if disk_op == constants.DDM_REMOVE:
5953         disk_addremove += 1
5954         continue
5955       elif disk_op == constants.DDM_ADD:
5956         disk_addremove += 1
5957       else:
5958         if not isinstance(disk_op, int):
5959           raise errors.OpPrereqError("Invalid disk index")
5960       if disk_op == constants.DDM_ADD:
5961         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5962         if mode not in constants.DISK_ACCESS_SET:
5963           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5964         size = disk_dict.get('size', None)
5965         if size is None:
5966           raise errors.OpPrereqError("Required disk parameter size missing")
5967         try:
5968           size = int(size)
5969         except ValueError, err:
5970           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5971                                      str(err))
5972         disk_dict['size'] = size
5973       else:
5974         # modification of disk
5975         if 'size' in disk_dict:
5976           raise errors.OpPrereqError("Disk size change not possible, use"
5977                                      " grow-disk")
5978
5979     if disk_addremove > 1:
5980       raise errors.OpPrereqError("Only one disk add or remove operation"
5981                                  " supported at a time")
5982
5983     # NIC validation
5984     nic_addremove = 0
5985     for nic_op, nic_dict in self.op.nics:
5986       if nic_op == constants.DDM_REMOVE:
5987         nic_addremove += 1
5988         continue
5989       elif nic_op == constants.DDM_ADD:
5990         nic_addremove += 1
5991       else:
5992         if not isinstance(nic_op, int):
5993           raise errors.OpPrereqError("Invalid nic index")
5994
5995       # nic_dict should be a dict
5996       nic_ip = nic_dict.get('ip', None)
5997       if nic_ip is not None:
5998         if nic_ip.lower() == constants.VALUE_NONE:
5999           nic_dict['ip'] = None
6000         else:
6001           if not utils.IsValidIP(nic_ip):
6002             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6003
6004       if nic_op == constants.DDM_ADD:
6005         nic_bridge = nic_dict.get('bridge', None)
6006         if nic_bridge is None:
6007           nic_dict['bridge'] = self.cfg.GetDefBridge()
6008         nic_mac = nic_dict.get('mac', None)
6009         if nic_mac is None:
6010           nic_dict['mac'] = constants.VALUE_AUTO
6011
6012       if 'mac' in nic_dict:
6013         nic_mac = nic_dict['mac']
6014         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6015           if not utils.IsValidMac(nic_mac):
6016             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6017         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6018           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6019                                      " modifying an existing nic")
6020
6021     if nic_addremove > 1:
6022       raise errors.OpPrereqError("Only one NIC add or remove operation"
6023                                  " supported at a time")
6024
6025   def ExpandNames(self):
6026     self._ExpandAndLockInstance()
6027     self.needed_locks[locking.LEVEL_NODE] = []
6028     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6029
6030   def DeclareLocks(self, level):
6031     if level == locking.LEVEL_NODE:
6032       self._LockInstancesNodes()
6033
6034   def BuildHooksEnv(self):
6035     """Build hooks env.
6036
6037     This runs on the master, primary and secondaries.
6038
6039     """
6040     args = dict()
6041     if constants.BE_MEMORY in self.be_new:
6042       args['memory'] = self.be_new[constants.BE_MEMORY]
6043     if constants.BE_VCPUS in self.be_new:
6044       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6045     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6046     # information at all.
6047     if self.op.nics:
6048       args['nics'] = []
6049       nic_override = dict(self.op.nics)
6050       for idx, nic in enumerate(self.instance.nics):
6051         if idx in nic_override:
6052           this_nic_override = nic_override[idx]
6053         else:
6054           this_nic_override = {}
6055         if 'ip' in this_nic_override:
6056           ip = this_nic_override['ip']
6057         else:
6058           ip = nic.ip
6059         if 'bridge' in this_nic_override:
6060           bridge = this_nic_override['bridge']
6061         else:
6062           bridge = nic.bridge
6063         if 'mac' in this_nic_override:
6064           mac = this_nic_override['mac']
6065         else:
6066           mac = nic.mac
6067         args['nics'].append((ip, bridge, mac))
6068       if constants.DDM_ADD in nic_override:
6069         ip = nic_override[constants.DDM_ADD].get('ip', None)
6070         bridge = nic_override[constants.DDM_ADD]['bridge']
6071         mac = nic_override[constants.DDM_ADD]['mac']
6072         args['nics'].append((ip, bridge, mac))
6073       elif constants.DDM_REMOVE in nic_override:
6074         del args['nics'][-1]
6075
6076     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6077     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6078     return env, nl, nl
6079
6080   def CheckPrereq(self):
6081     """Check prerequisites.
6082
6083     This only checks the instance list against the existing names.
6084
6085     """
6086     self.force = self.op.force
6087
6088     # checking the new params on the primary/secondary nodes
6089
6090     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6091     assert self.instance is not None, \
6092       "Cannot retrieve locked instance %s" % self.op.instance_name
6093     pnode = instance.primary_node
6094     nodelist = list(instance.all_nodes)
6095
6096     # hvparams processing
6097     if self.op.hvparams:
6098       i_hvdict = copy.deepcopy(instance.hvparams)
6099       for key, val in self.op.hvparams.iteritems():
6100         if val == constants.VALUE_DEFAULT:
6101           try:
6102             del i_hvdict[key]
6103           except KeyError:
6104             pass
6105         else:
6106           i_hvdict[key] = val
6107       cluster = self.cfg.GetClusterInfo()
6108       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
6109       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
6110                                 i_hvdict)
6111       # local check
6112       hypervisor.GetHypervisor(
6113         instance.hypervisor).CheckParameterSyntax(hv_new)
6114       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6115       self.hv_new = hv_new # the new actual values
6116       self.hv_inst = i_hvdict # the new dict (without defaults)
6117     else:
6118       self.hv_new = self.hv_inst = {}
6119
6120     # beparams processing
6121     if self.op.beparams:
6122       i_bedict = copy.deepcopy(instance.beparams)
6123       for key, val in self.op.beparams.iteritems():
6124         if val == constants.VALUE_DEFAULT:
6125           try:
6126             del i_bedict[key]
6127           except KeyError:
6128             pass
6129         else:
6130           i_bedict[key] = val
6131       cluster = self.cfg.GetClusterInfo()
6132       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6133       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6134                                 i_bedict)
6135       self.be_new = be_new # the new actual values
6136       self.be_inst = i_bedict # the new dict (without defaults)
6137     else:
6138       self.be_new = self.be_inst = {}
6139
6140     self.warn = []
6141
6142     if constants.BE_MEMORY in self.op.beparams and not self.force:
6143       mem_check_list = [pnode]
6144       if be_new[constants.BE_AUTO_BALANCE]:
6145         # either we changed auto_balance to yes or it was from before
6146         mem_check_list.extend(instance.secondary_nodes)
6147       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6148                                                   instance.hypervisor)
6149       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6150                                          instance.hypervisor)
6151       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6152         # Assume the primary node is unreachable and go ahead
6153         self.warn.append("Can't get info from primary node %s" % pnode)
6154       else:
6155         if not instance_info.failed and instance_info.data:
6156           current_mem = int(instance_info.data['memory'])
6157         else:
6158           # Assume instance not running
6159           # (there is a slight race condition here, but it's not very probable,
6160           # and we have no other way to check)
6161           current_mem = 0
6162         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6163                     nodeinfo[pnode].data['memory_free'])
6164         if miss_mem > 0:
6165           raise errors.OpPrereqError("This change will prevent the instance"
6166                                      " from starting, due to %d MB of memory"
6167                                      " missing on its primary node" % miss_mem)
6168
6169       if be_new[constants.BE_AUTO_BALANCE]:
6170         for node, nres in nodeinfo.iteritems():
6171           if node not in instance.secondary_nodes:
6172             continue
6173           if nres.failed or not isinstance(nres.data, dict):
6174             self.warn.append("Can't get info from secondary node %s" % node)
6175           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6176             self.warn.append("Not enough memory to failover instance to"
6177                              " secondary node %s" % node)
6178
6179     # NIC processing
6180     for nic_op, nic_dict in self.op.nics:
6181       if nic_op == constants.DDM_REMOVE:
6182         if not instance.nics:
6183           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6184         continue
6185       if nic_op != constants.DDM_ADD:
6186         # an existing nic
6187         if nic_op < 0 or nic_op >= len(instance.nics):
6188           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6189                                      " are 0 to %d" %
6190                                      (nic_op, len(instance.nics)))
6191       if 'bridge' in nic_dict:
6192         nic_bridge = nic_dict['bridge']
6193         if nic_bridge is None:
6194           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6195         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6196           msg = ("Bridge '%s' doesn't exist on one of"
6197                  " the instance nodes" % nic_bridge)
6198           if self.force:
6199             self.warn.append(msg)
6200           else:
6201             raise errors.OpPrereqError(msg)
6202       if 'mac' in nic_dict:
6203         nic_mac = nic_dict['mac']
6204         if nic_mac is None:
6205           raise errors.OpPrereqError('Cannot set the nic mac to None')
6206         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6207           # otherwise generate the mac
6208           nic_dict['mac'] = self.cfg.GenerateMAC()
6209         else:
6210           # or validate/reserve the current one
6211           if self.cfg.IsMacInUse(nic_mac):
6212             raise errors.OpPrereqError("MAC address %s already in use"
6213                                        " in cluster" % nic_mac)
6214
6215     # DISK processing
6216     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6217       raise errors.OpPrereqError("Disk operations not supported for"
6218                                  " diskless instances")
6219     for disk_op, disk_dict in self.op.disks:
6220       if disk_op == constants.DDM_REMOVE:
6221         if len(instance.disks) == 1:
6222           raise errors.OpPrereqError("Cannot remove the last disk of"
6223                                      " an instance")
6224         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6225         ins_l = ins_l[pnode]
6226         if ins_l.failed or not isinstance(ins_l.data, list):
6227           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6228         if instance.name in ins_l.data:
6229           raise errors.OpPrereqError("Instance is running, can't remove"
6230                                      " disks.")
6231
6232       if (disk_op == constants.DDM_ADD and
6233           len(instance.nics) >= constants.MAX_DISKS):
6234         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6235                                    " add more" % constants.MAX_DISKS)
6236       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6237         # an existing disk
6238         if disk_op < 0 or disk_op >= len(instance.disks):
6239           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6240                                      " are 0 to %d" %
6241                                      (disk_op, len(instance.disks)))
6242
6243     return
6244
6245   def Exec(self, feedback_fn):
6246     """Modifies an instance.
6247
6248     All parameters take effect only at the next restart of the instance.
6249
6250     """
6251     # Process here the warnings from CheckPrereq, as we don't have a
6252     # feedback_fn there.
6253     for warn in self.warn:
6254       feedback_fn("WARNING: %s" % warn)
6255
6256     result = []
6257     instance = self.instance
6258     # disk changes
6259     for disk_op, disk_dict in self.op.disks:
6260       if disk_op == constants.DDM_REMOVE:
6261         # remove the last disk
6262         device = instance.disks.pop()
6263         device_idx = len(instance.disks)
6264         for node, disk in device.ComputeNodeTree(instance.primary_node):
6265           self.cfg.SetDiskID(disk, node)
6266           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6267           if msg:
6268             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6269                             " continuing anyway", device_idx, node, msg)
6270         result.append(("disk/%d" % device_idx, "remove"))
6271       elif disk_op == constants.DDM_ADD:
6272         # add a new disk
6273         if instance.disk_template == constants.DT_FILE:
6274           file_driver, file_path = instance.disks[0].logical_id
6275           file_path = os.path.dirname(file_path)
6276         else:
6277           file_driver = file_path = None
6278         disk_idx_base = len(instance.disks)
6279         new_disk = _GenerateDiskTemplate(self,
6280                                          instance.disk_template,
6281                                          instance.name, instance.primary_node,
6282                                          instance.secondary_nodes,
6283                                          [disk_dict],
6284                                          file_path,
6285                                          file_driver,
6286                                          disk_idx_base)[0]
6287         instance.disks.append(new_disk)
6288         info = _GetInstanceInfoText(instance)
6289
6290         logging.info("Creating volume %s for instance %s",
6291                      new_disk.iv_name, instance.name)
6292         # Note: this needs to be kept in sync with _CreateDisks
6293         #HARDCODE
6294         for node in instance.all_nodes:
6295           f_create = node == instance.primary_node
6296           try:
6297             _CreateBlockDev(self, node, instance, new_disk,
6298                             f_create, info, f_create)
6299           except errors.OpExecError, err:
6300             self.LogWarning("Failed to create volume %s (%s) on"
6301                             " node %s: %s",
6302                             new_disk.iv_name, new_disk, node, err)
6303         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6304                        (new_disk.size, new_disk.mode)))
6305       else:
6306         # change a given disk
6307         instance.disks[disk_op].mode = disk_dict['mode']
6308         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6309     # NIC changes
6310     for nic_op, nic_dict in self.op.nics:
6311       if nic_op == constants.DDM_REMOVE:
6312         # remove the last nic
6313         del instance.nics[-1]
6314         result.append(("nic.%d" % len(instance.nics), "remove"))
6315       elif nic_op == constants.DDM_ADD:
6316         # mac and bridge should be set, by now
6317         mac = nic_dict['mac']
6318         bridge = nic_dict['bridge']
6319         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6320                               bridge=bridge)
6321         instance.nics.append(new_nic)
6322         result.append(("nic.%d" % (len(instance.nics) - 1),
6323                        "add:mac=%s,ip=%s,bridge=%s" %
6324                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6325       else:
6326         # change a given nic
6327         for key in 'mac', 'ip', 'bridge':
6328           if key in nic_dict:
6329             setattr(instance.nics[nic_op], key, nic_dict[key])
6330             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6331
6332     # hvparams changes
6333     if self.op.hvparams:
6334       instance.hvparams = self.hv_inst
6335       for key, val in self.op.hvparams.iteritems():
6336         result.append(("hv/%s" % key, val))
6337
6338     # beparams changes
6339     if self.op.beparams:
6340       instance.beparams = self.be_inst
6341       for key, val in self.op.beparams.iteritems():
6342         result.append(("be/%s" % key, val))
6343
6344     self.cfg.Update(instance)
6345
6346     return result
6347
6348
6349 class LUQueryExports(NoHooksLU):
6350   """Query the exports list
6351
6352   """
6353   _OP_REQP = ['nodes']
6354   REQ_BGL = False
6355
6356   def ExpandNames(self):
6357     self.needed_locks = {}
6358     self.share_locks[locking.LEVEL_NODE] = 1
6359     if not self.op.nodes:
6360       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6361     else:
6362       self.needed_locks[locking.LEVEL_NODE] = \
6363         _GetWantedNodes(self, self.op.nodes)
6364
6365   def CheckPrereq(self):
6366     """Check prerequisites.
6367
6368     """
6369     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6370
6371   def Exec(self, feedback_fn):
6372     """Compute the list of all the exported system images.
6373
6374     @rtype: dict
6375     @return: a dictionary with the structure node->(export-list)
6376         where export-list is a list of the instances exported on
6377         that node.
6378
6379     """
6380     rpcresult = self.rpc.call_export_list(self.nodes)
6381     result = {}
6382     for node in rpcresult:
6383       if rpcresult[node].failed:
6384         result[node] = False
6385       else:
6386         result[node] = rpcresult[node].data
6387
6388     return result
6389
6390
6391 class LUExportInstance(LogicalUnit):
6392   """Export an instance to an image in the cluster.
6393
6394   """
6395   HPATH = "instance-export"
6396   HTYPE = constants.HTYPE_INSTANCE
6397   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6398   REQ_BGL = False
6399
6400   def ExpandNames(self):
6401     self._ExpandAndLockInstance()
6402     # FIXME: lock only instance primary and destination node
6403     #
6404     # Sad but true, for now we have do lock all nodes, as we don't know where
6405     # the previous export might be, and and in this LU we search for it and
6406     # remove it from its current node. In the future we could fix this by:
6407     #  - making a tasklet to search (share-lock all), then create the new one,
6408     #    then one to remove, after
6409     #  - removing the removal operation altogether
6410     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6411
6412   def DeclareLocks(self, level):
6413     """Last minute lock declaration."""
6414     # All nodes are locked anyway, so nothing to do here.
6415
6416   def BuildHooksEnv(self):
6417     """Build hooks env.
6418
6419     This will run on the master, primary node and target node.
6420
6421     """
6422     env = {
6423       "EXPORT_NODE": self.op.target_node,
6424       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6425       }
6426     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6427     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6428           self.op.target_node]
6429     return env, nl, nl
6430
6431   def CheckPrereq(self):
6432     """Check prerequisites.
6433
6434     This checks that the instance and node names are valid.
6435
6436     """
6437     instance_name = self.op.instance_name
6438     self.instance = self.cfg.GetInstanceInfo(instance_name)
6439     assert self.instance is not None, \
6440           "Cannot retrieve locked instance %s" % self.op.instance_name
6441     _CheckNodeOnline(self, self.instance.primary_node)
6442
6443     self.dst_node = self.cfg.GetNodeInfo(
6444       self.cfg.ExpandNodeName(self.op.target_node))
6445
6446     if self.dst_node is None:
6447       # This is wrong node name, not a non-locked node
6448       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6449     _CheckNodeOnline(self, self.dst_node.name)
6450     _CheckNodeNotDrained(self, self.dst_node.name)
6451
6452     # instance disk type verification
6453     for disk in self.instance.disks:
6454       if disk.dev_type == constants.LD_FILE:
6455         raise errors.OpPrereqError("Export not supported for instances with"
6456                                    " file-based disks")
6457
6458   def Exec(self, feedback_fn):
6459     """Export an instance to an image in the cluster.
6460
6461     """
6462     instance = self.instance
6463     dst_node = self.dst_node
6464     src_node = instance.primary_node
6465     if self.op.shutdown:
6466       # shutdown the instance, but not the disks
6467       result = self.rpc.call_instance_shutdown(src_node, instance)
6468       msg = result.RemoteFailMsg()
6469       if msg:
6470         raise errors.OpExecError("Could not shutdown instance %s on"
6471                                  " node %s: %s" %
6472                                  (instance.name, src_node, msg))
6473
6474     vgname = self.cfg.GetVGName()
6475
6476     snap_disks = []
6477
6478     # set the disks ID correctly since call_instance_start needs the
6479     # correct drbd minor to create the symlinks
6480     for disk in instance.disks:
6481       self.cfg.SetDiskID(disk, src_node)
6482
6483     # per-disk results
6484     dresults = []
6485     try:
6486       for idx, disk in enumerate(instance.disks):
6487         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6488         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6489         if new_dev_name.failed or not new_dev_name.data:
6490           self.LogWarning("Could not snapshot disk/%d on node %s",
6491                           idx, src_node)
6492           snap_disks.append(False)
6493         else:
6494           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6495                                  logical_id=(vgname, new_dev_name.data),
6496                                  physical_id=(vgname, new_dev_name.data),
6497                                  iv_name=disk.iv_name)
6498           snap_disks.append(new_dev)
6499
6500     finally:
6501       if self.op.shutdown and instance.admin_up:
6502         result = self.rpc.call_instance_start(src_node, instance, None, None)
6503         msg = result.RemoteFailMsg()
6504         if msg:
6505           _ShutdownInstanceDisks(self, instance)
6506           raise errors.OpExecError("Could not start instance: %s" % msg)
6507
6508     # TODO: check for size
6509
6510     cluster_name = self.cfg.GetClusterName()
6511     for idx, dev in enumerate(snap_disks):
6512       if dev:
6513         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6514                                                instance, cluster_name, idx)
6515         if result.failed or not result.data:
6516           self.LogWarning("Could not export disk/%d from node %s to"
6517                           " node %s", idx, src_node, dst_node.name)
6518           dresults.append(False)
6519         else:
6520           dresults.append(True)
6521         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6522         if msg:
6523           self.LogWarning("Could not remove snapshot for disk/%d from node"
6524                           " %s: %s", idx, src_node, msg)
6525       else:
6526         dresults.append(False)
6527
6528     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6529     fin_resu = True
6530     if result.failed or not result.data:
6531       self.LogWarning("Could not finalize export for instance %s on node %s",
6532                       instance.name, dst_node.name)
6533       fin_resu = False
6534
6535     nodelist = self.cfg.GetNodeList()
6536     nodelist.remove(dst_node.name)
6537
6538     # on one-node clusters nodelist will be empty after the removal
6539     # if we proceed the backup would be removed because OpQueryExports
6540     # substitutes an empty list with the full cluster node list.
6541     if nodelist:
6542       exportlist = self.rpc.call_export_list(nodelist)
6543       for node in exportlist:
6544         if exportlist[node].failed:
6545           continue
6546         if instance.name in exportlist[node].data:
6547           if not self.rpc.call_export_remove(node, instance.name):
6548             self.LogWarning("Could not remove older export for instance %s"
6549                             " on node %s", instance.name, node)
6550     return fin_resu, dresults
6551
6552
6553 class LURemoveExport(NoHooksLU):
6554   """Remove exports related to the named instance.
6555
6556   """
6557   _OP_REQP = ["instance_name"]
6558   REQ_BGL = False
6559
6560   def ExpandNames(self):
6561     self.needed_locks = {}
6562     # We need all nodes to be locked in order for RemoveExport to work, but we
6563     # don't need to lock the instance itself, as nothing will happen to it (and
6564     # we can remove exports also for a removed instance)
6565     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6566
6567   def CheckPrereq(self):
6568     """Check prerequisites.
6569     """
6570     pass
6571
6572   def Exec(self, feedback_fn):
6573     """Remove any export.
6574
6575     """
6576     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6577     # If the instance was not found we'll try with the name that was passed in.
6578     # This will only work if it was an FQDN, though.
6579     fqdn_warn = False
6580     if not instance_name:
6581       fqdn_warn = True
6582       instance_name = self.op.instance_name
6583
6584     exportlist = self.rpc.call_export_list(self.acquired_locks[
6585       locking.LEVEL_NODE])
6586     found = False
6587     for node in exportlist:
6588       if exportlist[node].failed:
6589         self.LogWarning("Failed to query node %s, continuing" % node)
6590         continue
6591       if instance_name in exportlist[node].data:
6592         found = True
6593         result = self.rpc.call_export_remove(node, instance_name)
6594         if result.failed or not result.data:
6595           logging.error("Could not remove export for instance %s"
6596                         " on node %s", instance_name, node)
6597
6598     if fqdn_warn and not found:
6599       feedback_fn("Export not found. If trying to remove an export belonging"
6600                   " to a deleted instance please use its Fully Qualified"
6601                   " Domain Name.")
6602
6603
6604 class TagsLU(NoHooksLU):
6605   """Generic tags LU.
6606
6607   This is an abstract class which is the parent of all the other tags LUs.
6608
6609   """
6610
6611   def ExpandNames(self):
6612     self.needed_locks = {}
6613     if self.op.kind == constants.TAG_NODE:
6614       name = self.cfg.ExpandNodeName(self.op.name)
6615       if name is None:
6616         raise errors.OpPrereqError("Invalid node name (%s)" %
6617                                    (self.op.name,))
6618       self.op.name = name
6619       self.needed_locks[locking.LEVEL_NODE] = name
6620     elif self.op.kind == constants.TAG_INSTANCE:
6621       name = self.cfg.ExpandInstanceName(self.op.name)
6622       if name is None:
6623         raise errors.OpPrereqError("Invalid instance name (%s)" %
6624                                    (self.op.name,))
6625       self.op.name = name
6626       self.needed_locks[locking.LEVEL_INSTANCE] = name
6627
6628   def CheckPrereq(self):
6629     """Check prerequisites.
6630
6631     """
6632     if self.op.kind == constants.TAG_CLUSTER:
6633       self.target = self.cfg.GetClusterInfo()
6634     elif self.op.kind == constants.TAG_NODE:
6635       self.target = self.cfg.GetNodeInfo(self.op.name)
6636     elif self.op.kind == constants.TAG_INSTANCE:
6637       self.target = self.cfg.GetInstanceInfo(self.op.name)
6638     else:
6639       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6640                                  str(self.op.kind))
6641
6642
6643 class LUGetTags(TagsLU):
6644   """Returns the tags of a given object.
6645
6646   """
6647   _OP_REQP = ["kind", "name"]
6648   REQ_BGL = False
6649
6650   def Exec(self, feedback_fn):
6651     """Returns the tag list.
6652
6653     """
6654     return list(self.target.GetTags())
6655
6656
6657 class LUSearchTags(NoHooksLU):
6658   """Searches the tags for a given pattern.
6659
6660   """
6661   _OP_REQP = ["pattern"]
6662   REQ_BGL = False
6663
6664   def ExpandNames(self):
6665     self.needed_locks = {}
6666
6667   def CheckPrereq(self):
6668     """Check prerequisites.
6669
6670     This checks the pattern passed for validity by compiling it.
6671
6672     """
6673     try:
6674       self.re = re.compile(self.op.pattern)
6675     except re.error, err:
6676       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6677                                  (self.op.pattern, err))
6678
6679   def Exec(self, feedback_fn):
6680     """Returns the tag list.
6681
6682     """
6683     cfg = self.cfg
6684     tgts = [("/cluster", cfg.GetClusterInfo())]
6685     ilist = cfg.GetAllInstancesInfo().values()
6686     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6687     nlist = cfg.GetAllNodesInfo().values()
6688     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6689     results = []
6690     for path, target in tgts:
6691       for tag in target.GetTags():
6692         if self.re.search(tag):
6693           results.append((path, tag))
6694     return results
6695
6696
6697 class LUAddTags(TagsLU):
6698   """Sets a tag on a given object.
6699
6700   """
6701   _OP_REQP = ["kind", "name", "tags"]
6702   REQ_BGL = False
6703
6704   def CheckPrereq(self):
6705     """Check prerequisites.
6706
6707     This checks the type and length of the tag name and value.
6708
6709     """
6710     TagsLU.CheckPrereq(self)
6711     for tag in self.op.tags:
6712       objects.TaggableObject.ValidateTag(tag)
6713
6714   def Exec(self, feedback_fn):
6715     """Sets the tag.
6716
6717     """
6718     try:
6719       for tag in self.op.tags:
6720         self.target.AddTag(tag)
6721     except errors.TagError, err:
6722       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6723     try:
6724       self.cfg.Update(self.target)
6725     except errors.ConfigurationError:
6726       raise errors.OpRetryError("There has been a modification to the"
6727                                 " config file and the operation has been"
6728                                 " aborted. Please retry.")
6729
6730
6731 class LUDelTags(TagsLU):
6732   """Delete a list of tags from a given object.
6733
6734   """
6735   _OP_REQP = ["kind", "name", "tags"]
6736   REQ_BGL = False
6737
6738   def CheckPrereq(self):
6739     """Check prerequisites.
6740
6741     This checks that we have the given tag.
6742
6743     """
6744     TagsLU.CheckPrereq(self)
6745     for tag in self.op.tags:
6746       objects.TaggableObject.ValidateTag(tag)
6747     del_tags = frozenset(self.op.tags)
6748     cur_tags = self.target.GetTags()
6749     if not del_tags <= cur_tags:
6750       diff_tags = del_tags - cur_tags
6751       diff_names = ["'%s'" % tag for tag in diff_tags]
6752       diff_names.sort()
6753       raise errors.OpPrereqError("Tag(s) %s not found" %
6754                                  (",".join(diff_names)))
6755
6756   def Exec(self, feedback_fn):
6757     """Remove the tag from the object.
6758
6759     """
6760     for tag in self.op.tags:
6761       self.target.RemoveTag(tag)
6762     try:
6763       self.cfg.Update(self.target)
6764     except errors.ConfigurationError:
6765       raise errors.OpRetryError("There has been a modification to the"
6766                                 " config file and the operation has been"
6767                                 " aborted. Please retry.")
6768
6769
6770 class LUTestDelay(NoHooksLU):
6771   """Sleep for a specified amount of time.
6772
6773   This LU sleeps on the master and/or nodes for a specified amount of
6774   time.
6775
6776   """
6777   _OP_REQP = ["duration", "on_master", "on_nodes"]
6778   REQ_BGL = False
6779
6780   def ExpandNames(self):
6781     """Expand names and set required locks.
6782
6783     This expands the node list, if any.
6784
6785     """
6786     self.needed_locks = {}
6787     if self.op.on_nodes:
6788       # _GetWantedNodes can be used here, but is not always appropriate to use
6789       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6790       # more information.
6791       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6792       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6793
6794   def CheckPrereq(self):
6795     """Check prerequisites.
6796
6797     """
6798
6799   def Exec(self, feedback_fn):
6800     """Do the actual sleep.
6801
6802     """
6803     if self.op.on_master:
6804       if not utils.TestDelay(self.op.duration):
6805         raise errors.OpExecError("Error during master delay test")
6806     if self.op.on_nodes:
6807       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6808       if not result:
6809         raise errors.OpExecError("Complete failure from rpc call")
6810       for node, node_result in result.items():
6811         node_result.Raise()
6812         if not node_result.data:
6813           raise errors.OpExecError("Failure during rpc call to node %s,"
6814                                    " result: %s" % (node, node_result.data))
6815
6816
6817 class IAllocator(object):
6818   """IAllocator framework.
6819
6820   An IAllocator instance has three sets of attributes:
6821     - cfg that is needed to query the cluster
6822     - input data (all members of the _KEYS class attribute are required)
6823     - four buffer attributes (in|out_data|text), that represent the
6824       input (to the external script) in text and data structure format,
6825       and the output from it, again in two formats
6826     - the result variables from the script (success, info, nodes) for
6827       easy usage
6828
6829   """
6830   _ALLO_KEYS = [
6831     "mem_size", "disks", "disk_template",
6832     "os", "tags", "nics", "vcpus", "hypervisor",
6833     ]
6834   _RELO_KEYS = [
6835     "relocate_from",
6836     ]
6837
6838   def __init__(self, lu, mode, name, **kwargs):
6839     self.lu = lu
6840     # init buffer variables
6841     self.in_text = self.out_text = self.in_data = self.out_data = None
6842     # init all input fields so that pylint is happy
6843     self.mode = mode
6844     self.name = name
6845     self.mem_size = self.disks = self.disk_template = None
6846     self.os = self.tags = self.nics = self.vcpus = None
6847     self.hypervisor = None
6848     self.relocate_from = None
6849     # computed fields
6850     self.required_nodes = None
6851     # init result fields
6852     self.success = self.info = self.nodes = None
6853     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6854       keyset = self._ALLO_KEYS
6855     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6856       keyset = self._RELO_KEYS
6857     else:
6858       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6859                                    " IAllocator" % self.mode)
6860     for key in kwargs:
6861       if key not in keyset:
6862         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6863                                      " IAllocator" % key)
6864       setattr(self, key, kwargs[key])
6865     for key in keyset:
6866       if key not in kwargs:
6867         raise errors.ProgrammerError("Missing input parameter '%s' to"
6868                                      " IAllocator" % key)
6869     self._BuildInputData()
6870
6871   def _ComputeClusterData(self):
6872     """Compute the generic allocator input data.
6873
6874     This is the data that is independent of the actual operation.
6875
6876     """
6877     cfg = self.lu.cfg
6878     cluster_info = cfg.GetClusterInfo()
6879     # cluster data
6880     data = {
6881       "version": constants.IALLOCATOR_VERSION,
6882       "cluster_name": cfg.GetClusterName(),
6883       "cluster_tags": list(cluster_info.GetTags()),
6884       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6885       # we don't have job IDs
6886       }
6887     iinfo = cfg.GetAllInstancesInfo().values()
6888     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6889
6890     # node data
6891     node_results = {}
6892     node_list = cfg.GetNodeList()
6893
6894     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6895       hypervisor_name = self.hypervisor
6896     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6897       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6898
6899     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6900                                            hypervisor_name)
6901     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6902                        cluster_info.enabled_hypervisors)
6903     for nname, nresult in node_data.items():
6904       # first fill in static (config-based) values
6905       ninfo = cfg.GetNodeInfo(nname)
6906       pnr = {
6907         "tags": list(ninfo.GetTags()),
6908         "primary_ip": ninfo.primary_ip,
6909         "secondary_ip": ninfo.secondary_ip,
6910         "offline": ninfo.offline,
6911         "drained": ninfo.drained,
6912         "master_candidate": ninfo.master_candidate,
6913         }
6914
6915       if not (ninfo.offline or ninfo.drained):
6916         nresult.Raise()
6917         if not isinstance(nresult.data, dict):
6918           raise errors.OpExecError("Can't get data for node %s" % nname)
6919         remote_info = nresult.data
6920         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6921                      'vg_size', 'vg_free', 'cpu_total']:
6922           if attr not in remote_info:
6923             raise errors.OpExecError("Node '%s' didn't return attribute"
6924                                      " '%s'" % (nname, attr))
6925           try:
6926             remote_info[attr] = int(remote_info[attr])
6927           except ValueError, err:
6928             raise errors.OpExecError("Node '%s' returned invalid value"
6929                                      " for '%s': %s" % (nname, attr, err))
6930         # compute memory used by primary instances
6931         i_p_mem = i_p_up_mem = 0
6932         for iinfo, beinfo in i_list:
6933           if iinfo.primary_node == nname:
6934             i_p_mem += beinfo[constants.BE_MEMORY]
6935             if iinfo.name not in node_iinfo[nname].data:
6936               i_used_mem = 0
6937             else:
6938               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6939             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6940             remote_info['memory_free'] -= max(0, i_mem_diff)
6941
6942             if iinfo.admin_up:
6943               i_p_up_mem += beinfo[constants.BE_MEMORY]
6944
6945         # compute memory used by instances
6946         pnr_dyn = {
6947           "total_memory": remote_info['memory_total'],
6948           "reserved_memory": remote_info['memory_dom0'],
6949           "free_memory": remote_info['memory_free'],
6950           "total_disk": remote_info['vg_size'],
6951           "free_disk": remote_info['vg_free'],
6952           "total_cpus": remote_info['cpu_total'],
6953           "i_pri_memory": i_p_mem,
6954           "i_pri_up_memory": i_p_up_mem,
6955           }
6956         pnr.update(pnr_dyn)
6957
6958       node_results[nname] = pnr
6959     data["nodes"] = node_results
6960
6961     # instance data
6962     instance_data = {}
6963     for iinfo, beinfo in i_list:
6964       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
6965                   for n in iinfo.nics]
6966       pir = {
6967         "tags": list(iinfo.GetTags()),
6968         "admin_up": iinfo.admin_up,
6969         "vcpus": beinfo[constants.BE_VCPUS],
6970         "memory": beinfo[constants.BE_MEMORY],
6971         "os": iinfo.os,
6972         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6973         "nics": nic_data,
6974         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6975         "disk_template": iinfo.disk_template,
6976         "hypervisor": iinfo.hypervisor,
6977         }
6978       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6979                                                  pir["disks"])
6980       instance_data[iinfo.name] = pir
6981
6982     data["instances"] = instance_data
6983
6984     self.in_data = data
6985
6986   def _AddNewInstance(self):
6987     """Add new instance data to allocator structure.
6988
6989     This in combination with _AllocatorGetClusterData will create the
6990     correct structure needed as input for the allocator.
6991
6992     The checks for the completeness of the opcode must have already been
6993     done.
6994
6995     """
6996     data = self.in_data
6997
6998     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6999
7000     if self.disk_template in constants.DTS_NET_MIRROR:
7001       self.required_nodes = 2
7002     else:
7003       self.required_nodes = 1
7004     request = {
7005       "type": "allocate",
7006       "name": self.name,
7007       "disk_template": self.disk_template,
7008       "tags": self.tags,
7009       "os": self.os,
7010       "vcpus": self.vcpus,
7011       "memory": self.mem_size,
7012       "disks": self.disks,
7013       "disk_space_total": disk_space,
7014       "nics": self.nics,
7015       "required_nodes": self.required_nodes,
7016       }
7017     data["request"] = request
7018
7019   def _AddRelocateInstance(self):
7020     """Add relocate instance data to allocator structure.
7021
7022     This in combination with _IAllocatorGetClusterData will create the
7023     correct structure needed as input for the allocator.
7024
7025     The checks for the completeness of the opcode must have already been
7026     done.
7027
7028     """
7029     instance = self.lu.cfg.GetInstanceInfo(self.name)
7030     if instance is None:
7031       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7032                                    " IAllocator" % self.name)
7033
7034     if instance.disk_template not in constants.DTS_NET_MIRROR:
7035       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7036
7037     if len(instance.secondary_nodes) != 1:
7038       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7039
7040     self.required_nodes = 1
7041     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7042     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7043
7044     request = {
7045       "type": "relocate",
7046       "name": self.name,
7047       "disk_space_total": disk_space,
7048       "required_nodes": self.required_nodes,
7049       "relocate_from": self.relocate_from,
7050       }
7051     self.in_data["request"] = request
7052
7053   def _BuildInputData(self):
7054     """Build input data structures.
7055
7056     """
7057     self._ComputeClusterData()
7058
7059     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7060       self._AddNewInstance()
7061     else:
7062       self._AddRelocateInstance()
7063
7064     self.in_text = serializer.Dump(self.in_data)
7065
7066   def Run(self, name, validate=True, call_fn=None):
7067     """Run an instance allocator and return the results.
7068
7069     """
7070     if call_fn is None:
7071       call_fn = self.lu.rpc.call_iallocator_runner
7072
7073     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7074     result.Raise()
7075
7076     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7077       raise errors.OpExecError("Invalid result from master iallocator runner")
7078
7079     rcode, stdout, stderr, fail = result.data
7080
7081     if rcode == constants.IARUN_NOTFOUND:
7082       raise errors.OpExecError("Can't find allocator '%s'" % name)
7083     elif rcode == constants.IARUN_FAILURE:
7084       raise errors.OpExecError("Instance allocator call failed: %s,"
7085                                " output: %s" % (fail, stdout+stderr))
7086     self.out_text = stdout
7087     if validate:
7088       self._ValidateResult()
7089
7090   def _ValidateResult(self):
7091     """Process the allocator results.
7092
7093     This will process and if successful save the result in
7094     self.out_data and the other parameters.
7095
7096     """
7097     try:
7098       rdict = serializer.Load(self.out_text)
7099     except Exception, err:
7100       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7101
7102     if not isinstance(rdict, dict):
7103       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7104
7105     for key in "success", "info", "nodes":
7106       if key not in rdict:
7107         raise errors.OpExecError("Can't parse iallocator results:"
7108                                  " missing key '%s'" % key)
7109       setattr(self, key, rdict[key])
7110
7111     if not isinstance(rdict["nodes"], list):
7112       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7113                                " is not a list")
7114     self.out_data = rdict
7115
7116
7117 class LUTestAllocator(NoHooksLU):
7118   """Run allocator tests.
7119
7120   This LU runs the allocator tests
7121
7122   """
7123   _OP_REQP = ["direction", "mode", "name"]
7124
7125   def CheckPrereq(self):
7126     """Check prerequisites.
7127
7128     This checks the opcode parameters depending on the director and mode test.
7129
7130     """
7131     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7132       for attr in ["name", "mem_size", "disks", "disk_template",
7133                    "os", "tags", "nics", "vcpus"]:
7134         if not hasattr(self.op, attr):
7135           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7136                                      attr)
7137       iname = self.cfg.ExpandInstanceName(self.op.name)
7138       if iname is not None:
7139         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7140                                    iname)
7141       if not isinstance(self.op.nics, list):
7142         raise errors.OpPrereqError("Invalid parameter 'nics'")
7143       for row in self.op.nics:
7144         if (not isinstance(row, dict) or
7145             "mac" not in row or
7146             "ip" not in row or
7147             "bridge" not in row):
7148           raise errors.OpPrereqError("Invalid contents of the"
7149                                      " 'nics' parameter")
7150       if not isinstance(self.op.disks, list):
7151         raise errors.OpPrereqError("Invalid parameter 'disks'")
7152       for row in self.op.disks:
7153         if (not isinstance(row, dict) or
7154             "size" not in row or
7155             not isinstance(row["size"], int) or
7156             "mode" not in row or
7157             row["mode"] not in ['r', 'w']):
7158           raise errors.OpPrereqError("Invalid contents of the"
7159                                      " 'disks' parameter")
7160       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7161         self.op.hypervisor = self.cfg.GetHypervisorType()
7162     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7163       if not hasattr(self.op, "name"):
7164         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7165       fname = self.cfg.ExpandInstanceName(self.op.name)
7166       if fname is None:
7167         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7168                                    self.op.name)
7169       self.op.name = fname
7170       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7171     else:
7172       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7173                                  self.op.mode)
7174
7175     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7176       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7177         raise errors.OpPrereqError("Missing allocator name")
7178     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7179       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7180                                  self.op.direction)
7181
7182   def Exec(self, feedback_fn):
7183     """Run the allocator test.
7184
7185     """
7186     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7187       ial = IAllocator(self,
7188                        mode=self.op.mode,
7189                        name=self.op.name,
7190                        mem_size=self.op.mem_size,
7191                        disks=self.op.disks,
7192                        disk_template=self.op.disk_template,
7193                        os=self.op.os,
7194                        tags=self.op.tags,
7195                        nics=self.op.nics,
7196                        vcpus=self.op.vcpus,
7197                        hypervisor=self.op.hypervisor,
7198                        )
7199     else:
7200       ial = IAllocator(self,
7201                        mode=self.op.mode,
7202                        name=self.op.name,
7203                        relocate_from=list(self.relocate_from),
7204                        )
7205
7206     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7207       result = ial.in_text
7208     else:
7209       ial.Run(self.op.allocator, validate=False)
7210       result = ial.out_text
7211     return result