Fix two bugs in seldom-used codepaths
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq
51     - implement Exec
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_BGL = True
64
65   def __init__(self, processor, op, context, rpc):
66     """Constructor for LogicalUnit.
67
68     This needs to be overridden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = context.cfg
75     self.context = context
76     self.rpc = rpc
77     # Dicts used to declare locking needs to mcpu
78     self.needed_locks = None
79     self.acquired_locks = {}
80     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
81     self.add_locks = {}
82     self.remove_locks = {}
83     # Used to force good behavior when calling helper functions
84     self.recalculate_locks = {}
85     self.__ssh = None
86     # logging
87     self.LogWarning = processor.LogWarning # pylint: disable-msg=C0103
88     self.LogInfo = processor.LogInfo # pylint: disable-msg=C0103
89
90     for attr_name in self._OP_REQP:
91       attr_val = getattr(op, attr_name, None)
92       if attr_val is None:
93         raise errors.OpPrereqError("Required parameter '%s' missing" %
94                                    attr_name)
95     self.CheckArguments()
96
97   def __GetSSH(self):
98     """Returns the SshRunner object
99
100     """
101     if not self.__ssh:
102       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
103     return self.__ssh
104
105   ssh = property(fget=__GetSSH)
106
107   def CheckArguments(self):
108     """Check syntactic validity for the opcode arguments.
109
110     This method is for doing a simple syntactic check and ensure
111     validity of opcode parameters, without any cluster-related
112     checks. While the same can be accomplished in ExpandNames and/or
113     CheckPrereq, doing these separate is better because:
114
115       - ExpandNames is left as as purely a lock-related function
116       - CheckPrereq is run after we have acquired locks (and possible
117         waited for them)
118
119     The function is allowed to change the self.op attribute so that
120     later methods can no longer worry about missing parameters.
121
122     """
123     pass
124
125   def ExpandNames(self):
126     """Expand names for this LU.
127
128     This method is called before starting to execute the opcode, and it should
129     update all the parameters of the opcode to their canonical form (e.g. a
130     short node name must be fully expanded after this method has successfully
131     completed). This way locking, hooks, logging, ecc. can work correctly.
132
133     LUs which implement this method must also populate the self.needed_locks
134     member, as a dict with lock levels as keys, and a list of needed lock names
135     as values. Rules:
136
137       - use an empty dict if you don't need any lock
138       - if you don't need any lock at a particular level omit that level
139       - don't put anything for the BGL level
140       - if you want all locks at a level use locking.ALL_SET as a value
141
142     If you need to share locks (rather than acquire them exclusively) at one
143     level you can modify self.share_locks, setting a true value (usually 1) for
144     that level. By default locks are not shared.
145
146     Examples::
147
148       # Acquire all nodes and one instance
149       self.needed_locks = {
150         locking.LEVEL_NODE: locking.ALL_SET,
151         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
152       }
153       # Acquire just two nodes
154       self.needed_locks = {
155         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
156       }
157       # Acquire no locks
158       self.needed_locks = {} # No, you can't leave it to the default value None
159
160     """
161     # The implementation of this method is mandatory only if the new LU is
162     # concurrent, so that old LUs don't need to be changed all at the same
163     # time.
164     if self.REQ_BGL:
165       self.needed_locks = {} # Exclusive LUs don't need locks.
166     else:
167       raise NotImplementedError
168
169   def DeclareLocks(self, level):
170     """Declare LU locking needs for a level
171
172     While most LUs can just declare their locking needs at ExpandNames time,
173     sometimes there's the need to calculate some locks after having acquired
174     the ones before. This function is called just before acquiring locks at a
175     particular level, but after acquiring the ones at lower levels, and permits
176     such calculations. It can be used to modify self.needed_locks, and by
177     default it does nothing.
178
179     This function is only called if you have something already set in
180     self.needed_locks for the level.
181
182     @param level: Locking level which is going to be locked
183     @type level: member of ganeti.locking.LEVELS
184
185     """
186
187   def CheckPrereq(self):
188     """Check prerequisites for this LU.
189
190     This method should check that the prerequisites for the execution
191     of this LU are fulfilled. It can do internode communication, but
192     it should be idempotent - no cluster or system changes are
193     allowed.
194
195     The method should raise errors.OpPrereqError in case something is
196     not fulfilled. Its return value is ignored.
197
198     This method should also update all the parameters of the opcode to
199     their canonical form if it hasn't been done by ExpandNames before.
200
201     """
202     raise NotImplementedError
203
204   def Exec(self, feedback_fn):
205     """Execute the LU.
206
207     This method should implement the actual work. It should raise
208     errors.OpExecError for failures that are somewhat dealt with in
209     code, or expected.
210
211     """
212     raise NotImplementedError
213
214   def BuildHooksEnv(self):
215     """Build hooks environment for this LU.
216
217     This method should return a three-node tuple consisting of: a dict
218     containing the environment that will be used for running the
219     specific hook for this LU, a list of node names on which the hook
220     should run before the execution, and a list of node names on which
221     the hook should run after the execution.
222
223     The keys of the dict must not have 'GANETI_' prefixed as this will
224     be handled in the hooks runner. Also note additional keys will be
225     added by the hooks runner. If the LU doesn't define any
226     environment, an empty dict (and not None) should be returned.
227
228     No nodes should be returned as an empty list (and not None).
229
230     Note that if the HPATH for a LU class is None, this function will
231     not be called.
232
233     """
234     raise NotImplementedError
235
236   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
237     """Notify the LU about the results of its hooks.
238
239     This method is called every time a hooks phase is executed, and notifies
240     the Logical Unit about the hooks' result. The LU can then use it to alter
241     its result based on the hooks.  By default the method does nothing and the
242     previous result is passed back unchanged but any LU can define it if it
243     wants to use the local cluster hook-scripts somehow.
244
245     @param phase: one of L{constants.HOOKS_PHASE_POST} or
246         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
247     @param hook_results: the results of the multi-node hooks rpc call
248     @param feedback_fn: function used send feedback back to the caller
249     @param lu_result: the previous Exec result this LU had, or None
250         in the PRE phase
251     @return: the new Exec result, based on the previous result
252         and hook results
253
254     """
255     return lu_result
256
257   def _ExpandAndLockInstance(self):
258     """Helper function to expand and lock an instance.
259
260     Many LUs that work on an instance take its name in self.op.instance_name
261     and need to expand it and then declare the expanded name for locking. This
262     function does it, and then updates self.op.instance_name to the expanded
263     name. It also initializes needed_locks as a dict, if this hasn't been done
264     before.
265
266     """
267     if self.needed_locks is None:
268       self.needed_locks = {}
269     else:
270       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
271         "_ExpandAndLockInstance called with instance-level locks set"
272     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
273     if expanded_name is None:
274       raise errors.OpPrereqError("Instance '%s' not known" %
275                                   self.op.instance_name)
276     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
277     self.op.instance_name = expanded_name
278
279   def _LockInstancesNodes(self, primary_only=False):
280     """Helper function to declare instances' nodes for locking.
281
282     This function should be called after locking one or more instances to lock
283     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
284     with all primary or secondary nodes for instances already locked and
285     present in self.needed_locks[locking.LEVEL_INSTANCE].
286
287     It should be called from DeclareLocks, and for safety only works if
288     self.recalculate_locks[locking.LEVEL_NODE] is set.
289
290     In the future it may grow parameters to just lock some instance's nodes, or
291     to just lock primaries or secondary nodes, if needed.
292
293     If should be called in DeclareLocks in a way similar to::
294
295       if level == locking.LEVEL_NODE:
296         self._LockInstancesNodes()
297
298     @type primary_only: boolean
299     @param primary_only: only lock primary nodes of locked instances
300
301     """
302     assert locking.LEVEL_NODE in self.recalculate_locks, \
303       "_LockInstancesNodes helper function called with no nodes to recalculate"
304
305     # TODO: check if we're really been called with the instance locks held
306
307     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
308     # future we might want to have different behaviors depending on the value
309     # of self.recalculate_locks[locking.LEVEL_NODE]
310     wanted_nodes = []
311     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
312       instance = self.context.cfg.GetInstanceInfo(instance_name)
313       wanted_nodes.append(instance.primary_node)
314       if not primary_only:
315         wanted_nodes.extend(instance.secondary_nodes)
316
317     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
318       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
319     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
320       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
321
322     del self.recalculate_locks[locking.LEVEL_NODE]
323
324
325 class NoHooksLU(LogicalUnit): # pylint: disable-msg=W0223
326   """Simple LU which runs no hooks.
327
328   This LU is intended as a parent for other LogicalUnits which will
329   run no hooks, in order to reduce duplicate code.
330
331   """
332   HPATH = None
333   HTYPE = None
334
335
336 def _GetWantedNodes(lu, nodes):
337   """Returns list of checked and expanded node names.
338
339   @type lu: L{LogicalUnit}
340   @param lu: the logical unit on whose behalf we execute
341   @type nodes: list
342   @param nodes: list of node names or None for all nodes
343   @rtype: list
344   @return: the list of nodes, sorted
345   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
346
347   """
348   if not isinstance(nodes, list):
349     raise errors.OpPrereqError("Invalid argument type 'nodes'")
350
351   if not nodes:
352     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
353       " non-empty list of nodes whose name is to be expanded.")
354
355   wanted = []
356   for name in nodes:
357     node = lu.cfg.ExpandNodeName(name)
358     if node is None:
359       raise errors.OpPrereqError("No such node name '%s'" % name)
360     wanted.append(node)
361
362   return utils.NiceSort(wanted)
363
364
365 def _GetWantedInstances(lu, instances):
366   """Returns list of checked and expanded instance names.
367
368   @type lu: L{LogicalUnit}
369   @param lu: the logical unit on whose behalf we execute
370   @type instances: list
371   @param instances: list of instance names or None for all instances
372   @rtype: list
373   @return: the list of instances, sorted
374   @raise errors.OpPrereqError: if the instances parameter is wrong type
375   @raise errors.OpPrereqError: if any of the passed instances is not found
376
377   """
378   if not isinstance(instances, list):
379     raise errors.OpPrereqError("Invalid argument type 'instances'")
380
381   if instances:
382     wanted = []
383
384     for name in instances:
385       instance = lu.cfg.ExpandInstanceName(name)
386       if instance is None:
387         raise errors.OpPrereqError("No such instance name '%s'" % name)
388       wanted.append(instance)
389
390   else:
391     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
392   return wanted
393
394
395 def _CheckOutputFields(static, dynamic, selected):
396   """Checks whether all selected fields are valid.
397
398   @type static: L{utils.FieldSet}
399   @param static: static fields set
400   @type dynamic: L{utils.FieldSet}
401   @param dynamic: dynamic fields set
402
403   """
404   f = utils.FieldSet()
405   f.Extend(static)
406   f.Extend(dynamic)
407
408   delta = f.NonMatching(selected)
409   if delta:
410     raise errors.OpPrereqError("Unknown output fields selected: %s"
411                                % ",".join(delta))
412
413
414 def _CheckBooleanOpField(op, name):
415   """Validates boolean opcode parameters.
416
417   This will ensure that an opcode parameter is either a boolean value,
418   or None (but that it always exists).
419
420   """
421   val = getattr(op, name, None)
422   if not (val is None or isinstance(val, bool)):
423     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
424                                (name, str(val)))
425   setattr(op, name, val)
426
427
428 def _CheckNodeOnline(lu, node):
429   """Ensure that a given node is online.
430
431   @param lu: the LU on behalf of which we make the check
432   @param node: the node to check
433   @raise errors.OpPrereqError: if the node is offline
434
435   """
436   if lu.cfg.GetNodeInfo(node).offline:
437     raise errors.OpPrereqError("Can't use offline node %s" % node)
438
439
440 def _CheckNodeNotDrained(lu, node):
441   """Ensure that a given node is not drained.
442
443   @param lu: the LU on behalf of which we make the check
444   @param node: the node to check
445   @raise errors.OpPrereqError: if the node is drained
446
447   """
448   if lu.cfg.GetNodeInfo(node).drained:
449     raise errors.OpPrereqError("Can't use drained node %s" % node)
450
451
452 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
453                           memory, vcpus, nics, disk_template, disks,
454                           bep, hvp, hypervisor_name):
455   """Builds instance related env variables for hooks
456
457   This builds the hook environment from individual variables.
458
459   @type name: string
460   @param name: the name of the instance
461   @type primary_node: string
462   @param primary_node: the name of the instance's primary node
463   @type secondary_nodes: list
464   @param secondary_nodes: list of secondary nodes as strings
465   @type os_type: string
466   @param os_type: the name of the instance's OS
467   @type status: boolean
468   @param status: the should_run status of the instance
469   @type memory: string
470   @param memory: the memory size of the instance
471   @type vcpus: string
472   @param vcpus: the count of VCPUs the instance has
473   @type nics: list
474   @param nics: list of tuples (ip, bridge, mac) representing
475       the NICs the instance  has
476   @type disk_template: string
477   @param disk_template: the disk template of the instance
478   @type disks: list
479   @param disks: the list of (size, mode) pairs
480   @type bep: dict
481   @param bep: the backend parameters for the instance
482   @type hvp: dict
483   @param hvp: the hypervisor parameters for the instance
484   @type hypervisor_name: string
485   @param hypervisor_name: the hypervisor for the instance
486   @rtype: dict
487   @return: the hook environment for this instance
488
489   """
490   if status:
491     str_status = "up"
492   else:
493     str_status = "down"
494   env = {
495     "OP_TARGET": name,
496     "INSTANCE_NAME": name,
497     "INSTANCE_PRIMARY": primary_node,
498     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
499     "INSTANCE_OS_TYPE": os_type,
500     "INSTANCE_STATUS": str_status,
501     "INSTANCE_MEMORY": memory,
502     "INSTANCE_VCPUS": vcpus,
503     "INSTANCE_DISK_TEMPLATE": disk_template,
504     "INSTANCE_HYPERVISOR": hypervisor_name,
505   }
506
507   if nics:
508     nic_count = len(nics)
509     for idx, (ip, bridge, mac) in enumerate(nics):
510       if ip is None:
511         ip = ""
512       env["INSTANCE_NIC%d_IP" % idx] = ip
513       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
514       env["INSTANCE_NIC%d_MAC" % idx] = mac
515   else:
516     nic_count = 0
517
518   env["INSTANCE_NIC_COUNT"] = nic_count
519
520   if disks:
521     disk_count = len(disks)
522     for idx, (size, mode) in enumerate(disks):
523       env["INSTANCE_DISK%d_SIZE" % idx] = size
524       env["INSTANCE_DISK%d_MODE" % idx] = mode
525   else:
526     disk_count = 0
527
528   env["INSTANCE_DISK_COUNT"] = disk_count
529
530   for source, kind in [(bep, "BE"), (hvp, "HV")]:
531     for key, value in source.items():
532       env["INSTANCE_%s_%s" % (kind, key)] = value
533
534   return env
535
536
537 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
538   """Builds instance related env variables for hooks from an object.
539
540   @type lu: L{LogicalUnit}
541   @param lu: the logical unit on whose behalf we execute
542   @type instance: L{objects.Instance}
543   @param instance: the instance for which we should build the
544       environment
545   @type override: dict
546   @param override: dictionary with key/values that will override
547       our values
548   @rtype: dict
549   @return: the hook environment dictionary
550
551   """
552   cluster = lu.cfg.GetClusterInfo()
553   bep = cluster.FillBE(instance)
554   hvp = cluster.FillHV(instance)
555   args = {
556     'name': instance.name,
557     'primary_node': instance.primary_node,
558     'secondary_nodes': instance.secondary_nodes,
559     'os_type': instance.os,
560     'status': instance.admin_up,
561     'memory': bep[constants.BE_MEMORY],
562     'vcpus': bep[constants.BE_VCPUS],
563     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
564     'disk_template': instance.disk_template,
565     'disks': [(disk.size, disk.mode) for disk in instance.disks],
566     'bep': bep,
567     'hvp': hvp,
568     'hypervisor_name': instance.hypervisor,
569   }
570   if override:
571     args.update(override)
572   return _BuildInstanceHookEnv(**args)
573
574
575 def _AdjustCandidatePool(lu):
576   """Adjust the candidate pool after node operations.
577
578   """
579   mod_list = lu.cfg.MaintainCandidatePool()
580   if mod_list:
581     lu.LogInfo("Promoted nodes to master candidate role: %s",
582                ", ".join(node.name for node in mod_list))
583     for name in mod_list:
584       lu.context.ReaddNode(name)
585   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
586   if mc_now > mc_max:
587     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
588                (mc_now, mc_max))
589
590
591 def _CheckInstanceBridgesExist(lu, instance):
592   """Check that the bridges needed by an instance exist.
593
594   """
595   # check bridges existence
596   brlist = [nic.bridge for nic in instance.nics]
597   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
598   result.Raise()
599   if not result.data:
600     raise errors.OpPrereqError("One or more target bridges %s does not"
601                                " exist on destination node '%s'" %
602                                (brlist, instance.primary_node))
603
604
605 class LUDestroyCluster(NoHooksLU):
606   """Logical unit for destroying the cluster.
607
608   """
609   _OP_REQP = []
610
611   def CheckPrereq(self):
612     """Check prerequisites.
613
614     This checks whether the cluster is empty.
615
616     Any errors are signaled by raising errors.OpPrereqError.
617
618     """
619     master = self.cfg.GetMasterNode()
620
621     nodelist = self.cfg.GetNodeList()
622     if len(nodelist) != 1 or nodelist[0] != master:
623       raise errors.OpPrereqError("There are still %d node(s) in"
624                                  " this cluster." % (len(nodelist) - 1))
625     instancelist = self.cfg.GetInstanceList()
626     if instancelist:
627       raise errors.OpPrereqError("There are still %d instance(s) in"
628                                  " this cluster." % len(instancelist))
629
630   def Exec(self, feedback_fn):
631     """Destroys the cluster.
632
633     """
634     master = self.cfg.GetMasterNode()
635     result = self.rpc.call_node_stop_master(master, False)
636     result.Raise()
637     if not result.data:
638       raise errors.OpExecError("Could not disable the master role")
639     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640     utils.CreateBackup(priv_key)
641     utils.CreateBackup(pub_key)
642     return master
643
644
645 class LUVerifyCluster(LogicalUnit):
646   """Verifies the cluster status.
647
648   """
649   HPATH = "cluster-verify"
650   HTYPE = constants.HTYPE_CLUSTER
651   _OP_REQP = ["skip_checks"]
652   REQ_BGL = False
653
654   def ExpandNames(self):
655     self.needed_locks = {
656       locking.LEVEL_NODE: locking.ALL_SET,
657       locking.LEVEL_INSTANCE: locking.ALL_SET,
658     }
659     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660
661   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662                   node_result, feedback_fn, master_files,
663                   drbd_map, vg_name):
664     """Run multiple tests against a node.
665
666     Test list:
667
668       - compares ganeti version
669       - checks vg existence and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     @type nodeinfo: L{objects.Node}
674     @param nodeinfo: the node to check
675     @param file_list: required list of files
676     @param local_cksum: dictionary of local files and their checksums
677     @param node_result: the results from the node
678     @param feedback_fn: function used to accumulate results
679     @param master_files: list of files that only masters should have
680     @param drbd_map: the useddrbd minors for this node, in
681         form of minor: (instance, must_exist) which correspond to instances
682         and their running status
683     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684
685     """
686     node = nodeinfo.name
687
688     # main result, node_result should be a non-empty dict
689     if not node_result or not isinstance(node_result, dict):
690       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691       return True
692
693     # compares ganeti version
694     local_version = constants.PROTOCOL_VERSION
695     remote_version = node_result.get('version', None)
696     if not (remote_version and isinstance(remote_version, (list, tuple)) and
697             len(remote_version) == 2):
698       feedback_fn("  - ERROR: connection to %s failed" % (node))
699       return True
700
701     if local_version != remote_version[0]:
702       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703                   " node %s %s" % (local_version, node, remote_version[0]))
704       return True
705
706     # node seems compatible, we can actually try to look into its results
707
708     bad = False
709
710     # full package version
711     if constants.RELEASE_VERSION != remote_version[1]:
712       feedback_fn("  - WARNING: software version mismatch: master %s,"
713                   " node %s %s" %
714                   (constants.RELEASE_VERSION, node, remote_version[1]))
715
716     # checks vg existence and size > 20G
717     if vg_name is not None:
718       vglist = node_result.get(constants.NV_VGLIST, None)
719       if not vglist:
720         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721                         (node,))
722         bad = True
723       else:
724         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725                                               constants.MIN_VG_SIZE)
726         if vgstatus:
727           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728           bad = True
729
730     # checks config file checksum
731
732     remote_cksum = node_result.get(constants.NV_FILELIST, None)
733     if not isinstance(remote_cksum, dict):
734       bad = True
735       feedback_fn("  - ERROR: node hasn't returned file checksum data")
736     else:
737       for file_name in file_list:
738         node_is_mc = nodeinfo.master_candidate
739         must_have_file = file_name not in master_files
740         if file_name not in remote_cksum:
741           if node_is_mc or must_have_file:
742             bad = True
743             feedback_fn("  - ERROR: file '%s' missing" % file_name)
744         elif remote_cksum[file_name] != local_cksum[file_name]:
745           if node_is_mc or must_have_file:
746             bad = True
747             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748           else:
749             # not candidate and this is not a must-have file
750             bad = True
751             feedback_fn("  - ERROR: file '%s' should not exist on non master"
752                         " candidates (and the file is outdated)" % file_name)
753         else:
754           # all good, except non-master/non-must have combination
755           if not node_is_mc and not must_have_file:
756             feedback_fn("  - ERROR: file '%s' should not exist on non master"
757                         " candidates" % file_name)
758
759     # checks ssh to any
760
761     if constants.NV_NODELIST not in node_result:
762       bad = True
763       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764     else:
765       if node_result[constants.NV_NODELIST]:
766         bad = True
767         for node in node_result[constants.NV_NODELIST]:
768           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769                           (node, node_result[constants.NV_NODELIST][node]))
770
771     if constants.NV_NODENETTEST not in node_result:
772       bad = True
773       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774     else:
775       if node_result[constants.NV_NODENETTEST]:
776         bad = True
777         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778         for node in nlist:
779           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780                           (node, node_result[constants.NV_NODENETTEST][node]))
781
782     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783     if isinstance(hyp_result, dict):
784       for hv_name, hv_result in hyp_result.iteritems():
785         if hv_result is not None:
786           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787                       (hv_name, hv_result))
788
789     # check used drbd list
790     if vg_name is not None:
791       used_minors = node_result.get(constants.NV_DRBDLIST, [])
792       if not isinstance(used_minors, (tuple, list)):
793         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794                     str(used_minors))
795       else:
796         for minor, (iname, must_exist) in drbd_map.items():
797           if minor not in used_minors and must_exist:
798             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799                         " not active" % (minor, iname))
800             bad = True
801         for minor in used_minors:
802           if minor not in drbd_map:
803             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804                         minor)
805             bad = True
806
807     return bad
808
809   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810                       node_instance, feedback_fn, n_offline):
811     """Verify an instance.
812
813     This function checks to see if the required block devices are
814     available on the instance's node.
815
816     """
817     bad = False
818
819     node_current = instanceconfig.primary_node
820
821     node_vol_should = {}
822     instanceconfig.MapLVsByNode(node_vol_should)
823
824     for node in node_vol_should:
825       if node in n_offline:
826         # ignore missing volumes on offline nodes
827         continue
828       for volume in node_vol_should[node]:
829         if node not in node_vol_is or volume not in node_vol_is[node]:
830           feedback_fn("  - ERROR: volume %s missing on node %s" %
831                           (volume, node))
832           bad = True
833
834     if instanceconfig.admin_up:
835       if ((node_current not in node_instance or
836           not instance in node_instance[node_current]) and
837           node_current not in n_offline):
838         feedback_fn("  - ERROR: instance %s not running on node %s" %
839                         (instance, node_current))
840         bad = True
841
842     for node in node_instance:
843       if (not node == node_current):
844         if instance in node_instance[node]:
845           feedback_fn("  - ERROR: instance %s should not run on node %s" %
846                           (instance, node))
847           bad = True
848
849     return bad
850
851   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852     """Verify if there are any unknown volumes in the cluster.
853
854     The .os, .swap and backup volumes are ignored. All other volumes are
855     reported as unknown.
856
857     """
858     bad = False
859
860     for node in node_vol_is:
861       for volume in node_vol_is[node]:
862         if node not in node_vol_should or volume not in node_vol_should[node]:
863           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864                       (volume, node))
865           bad = True
866     return bad
867
868   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869     """Verify the list of running instances.
870
871     This checks what instances are running but unknown to the cluster.
872
873     """
874     bad = False
875     for node in node_instance:
876       for runninginstance in node_instance[node]:
877         if runninginstance not in instancelist:
878           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879                           (runninginstance, node))
880           bad = True
881     return bad
882
883   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884     """Verify N+1 Memory Resilience.
885
886     Check that if one single node dies we can still start all the instances it
887     was primary for.
888
889     """
890     bad = False
891
892     for node, nodeinfo in node_info.iteritems():
893       # This code checks that every node which is now listed as secondary has
894       # enough memory to host all instances it is supposed to should a single
895       # other node in the cluster fail.
896       # FIXME: not ready for failover to an arbitrary node
897       # FIXME: does not support file-backed instances
898       # WARNING: we currently take into account down instances as well as up
899       # ones, considering that even if they're down someone might want to start
900       # them even in the event of a node failure.
901       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902         needed_mem = 0
903         for instance in instances:
904           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905           if bep[constants.BE_AUTO_BALANCE]:
906             needed_mem += bep[constants.BE_MEMORY]
907         if nodeinfo['mfree'] < needed_mem:
908           feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
909                       " failovers should node %s fail" % (node, prinode))
910           bad = True
911     return bad
912
913   def CheckPrereq(self):
914     """Check prerequisites.
915
916     Transform the list of checks we're going to skip into a set and check that
917     all its members are valid.
918
919     """
920     self.skip_set = frozenset(self.op.skip_checks)
921     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922       raise errors.OpPrereqError("Invalid checks to be skipped specified")
923
924   def BuildHooksEnv(self):
925     """Build hooks env.
926
927     Cluster-Verify hooks just ran in the post phase and their failure makes
928     the output be logged in the verify output and the verification to fail.
929
930     """
931     all_nodes = self.cfg.GetNodeList()
932     env = {
933       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934       }
935     for node in self.cfg.GetAllNodesInfo().values():
936       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937
938     return env, [], all_nodes
939
940   def Exec(self, feedback_fn):
941     """Verify integrity of cluster, performing various test on nodes.
942
943     """
944     bad = False
945     feedback_fn("* Verifying global settings")
946     for msg in self.cfg.VerifyConfig():
947       feedback_fn("  - ERROR: %s" % msg)
948
949     vg_name = self.cfg.GetVGName()
950     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951     nodelist = utils.NiceSort(self.cfg.GetNodeList())
952     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955                         for iname in instancelist)
956     i_non_redundant = [] # Non redundant instances
957     i_non_a_balanced = [] # Non auto-balanced instances
958     n_offline = [] # List of offline nodes
959     n_drained = [] # List of nodes being drained
960     node_volume = {}
961     node_instance = {}
962     node_info = {}
963     instance_cfg = {}
964
965     # FIXME: verify OS list
966     # do local checksums
967     master_files = [constants.CLUSTER_CONF_FILE]
968
969     file_names = ssconf.SimpleStore().GetFileList()
970     file_names.append(constants.SSL_CERT_FILE)
971     file_names.append(constants.RAPI_CERT_FILE)
972     file_names.extend(master_files)
973
974     local_checksums = utils.FingerprintFiles(file_names)
975
976     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977     node_verify_param = {
978       constants.NV_FILELIST: file_names,
979       constants.NV_NODELIST: [node.name for node in nodeinfo
980                               if not node.offline],
981       constants.NV_HYPERVISOR: hypervisors,
982       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983                                   node.secondary_ip) for node in nodeinfo
984                                  if not node.offline],
985       constants.NV_INSTANCELIST: hypervisors,
986       constants.NV_VERSION: None,
987       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988       }
989     if vg_name is not None:
990       node_verify_param[constants.NV_VGLIST] = None
991       node_verify_param[constants.NV_LVLIST] = vg_name
992       node_verify_param[constants.NV_DRBDLIST] = None
993     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994                                            self.cfg.GetClusterName())
995
996     cluster = self.cfg.GetClusterInfo()
997     master_node = self.cfg.GetMasterNode()
998     all_drbd_map = self.cfg.ComputeDRBDMap()
999
1000     for node_i in nodeinfo:
1001       node = node_i.name
1002       nresult = all_nvinfo[node].data
1003
1004       if node_i.offline:
1005         feedback_fn("* Skipping offline node %s" % (node,))
1006         n_offline.append(node)
1007         continue
1008
1009       if node == master_node:
1010         ntype = "master"
1011       elif node_i.master_candidate:
1012         ntype = "master candidate"
1013       elif node_i.drained:
1014         ntype = "drained"
1015         n_drained.append(node)
1016       else:
1017         ntype = "regular"
1018       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019
1020       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022         bad = True
1023         continue
1024
1025       node_drbd = {}
1026       for minor, instance in all_drbd_map[node].items():
1027         if instance not in instanceinfo:
1028           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029                       instance)
1030           # ghost instance should not be running, but otherwise we
1031           # don't give double warnings (both ghost instance and
1032           # unallocated minor in use)
1033           node_drbd[minor] = (instance, False)
1034         else:
1035           instance = instanceinfo[instance]
1036           node_drbd[minor] = (instance.name, instance.admin_up)
1037       result = self._VerifyNode(node_i, file_names, local_checksums,
1038                                 nresult, feedback_fn, master_files,
1039                                 node_drbd, vg_name)
1040       bad = bad or result
1041
1042       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043       if vg_name is None:
1044         node_volume[node] = {}
1045       elif isinstance(lvdata, basestring):
1046         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047                     (node, utils.SafeEncode(lvdata)))
1048         bad = True
1049         node_volume[node] = {}
1050       elif not isinstance(lvdata, dict):
1051         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052         bad = True
1053         continue
1054       else:
1055         node_volume[node] = lvdata
1056
1057       # node_instance
1058       idata = nresult.get(constants.NV_INSTANCELIST, None)
1059       if not isinstance(idata, list):
1060         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061                     (node,))
1062         bad = True
1063         continue
1064
1065       node_instance[node] = idata
1066
1067       # node_info
1068       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069       if not isinstance(nodeinfo, dict):
1070         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071         bad = True
1072         continue
1073
1074       try:
1075         node_info[node] = {
1076           "mfree": int(nodeinfo['memory_free']),
1077           "pinst": [],
1078           "sinst": [],
1079           # dictionary holding all instances this node is secondary for,
1080           # grouped by their primary node. Each key is a cluster node, and each
1081           # value is a list of instances which have the key as primary and the
1082           # current node as secondary.  this is handy to calculate N+1 memory
1083           # availability if you can only failover from a primary to its
1084           # secondary.
1085           "sinst-by-pnode": {},
1086         }
1087         # FIXME: devise a free space model for file based instances as well
1088         if vg_name is not None:
1089           if (constants.NV_VGLIST not in nresult or
1090               vg_name not in nresult[constants.NV_VGLIST]):
1091             feedback_fn("  - ERROR: node %s didn't return data for the"
1092                         " volume group '%s' - it is either missing or broken" %
1093                         (node, vg_name))
1094             bad = True
1095             continue
1096           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097       except (ValueError, KeyError):
1098         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099                     " from node %s" % (node,))
1100         bad = True
1101         continue
1102
1103     node_vol_should = {}
1104
1105     for instance in instancelist:
1106       feedback_fn("* Verifying instance %s" % instance)
1107       inst_config = instanceinfo[instance]
1108       result =  self._VerifyInstance(instance, inst_config, node_volume,
1109                                      node_instance, feedback_fn, n_offline)
1110       bad = bad or result
1111       inst_nodes_offline = []
1112
1113       inst_config.MapLVsByNode(node_vol_should)
1114
1115       instance_cfg[instance] = inst_config
1116
1117       pnode = inst_config.primary_node
1118       if pnode in node_info:
1119         node_info[pnode]['pinst'].append(instance)
1120       elif pnode not in n_offline:
1121         feedback_fn("  - ERROR: instance %s, connection to primary node"
1122                     " %s failed" % (instance, pnode))
1123         bad = True
1124
1125       if pnode in n_offline:
1126         inst_nodes_offline.append(pnode)
1127
1128       # If the instance is non-redundant we cannot survive losing its primary
1129       # node, so we are not N+1 compliant. On the other hand we have no disk
1130       # templates with more than one secondary so that situation is not well
1131       # supported either.
1132       # FIXME: does not support file-backed instances
1133       if len(inst_config.secondary_nodes) == 0:
1134         i_non_redundant.append(instance)
1135       elif len(inst_config.secondary_nodes) > 1:
1136         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137                     % instance)
1138
1139       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140         i_non_a_balanced.append(instance)
1141
1142       for snode in inst_config.secondary_nodes:
1143         if snode in node_info:
1144           node_info[snode]['sinst'].append(instance)
1145           if pnode not in node_info[snode]['sinst-by-pnode']:
1146             node_info[snode]['sinst-by-pnode'][pnode] = []
1147           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148         elif snode not in n_offline:
1149           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150                       " %s failed" % (instance, snode))
1151           bad = True
1152         if snode in n_offline:
1153           inst_nodes_offline.append(snode)
1154
1155       if inst_nodes_offline:
1156         # warn that the instance lives on offline nodes, and set bad=True
1157         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158                     ", ".join(inst_nodes_offline))
1159         bad = True
1160
1161     feedback_fn("* Verifying orphan volumes")
1162     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163                                        feedback_fn)
1164     bad = bad or result
1165
1166     feedback_fn("* Verifying remaining instances")
1167     result = self._VerifyOrphanInstances(instancelist, node_instance,
1168                                          feedback_fn)
1169     bad = bad or result
1170
1171     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172       feedback_fn("* Verifying N+1 Memory redundancy")
1173       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174       bad = bad or result
1175
1176     feedback_fn("* Other Notes")
1177     if i_non_redundant:
1178       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179                   % len(i_non_redundant))
1180
1181     if i_non_a_balanced:
1182       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183                   % len(i_non_a_balanced))
1184
1185     if n_offline:
1186       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187
1188     if n_drained:
1189       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190
1191     return not bad
1192
1193   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194     """Analyze the post-hooks' result
1195
1196     This method analyses the hook result, handles it, and sends some
1197     nicely-formatted feedback back to the user.
1198
1199     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201     @param hooks_results: the results of the multi-node hooks rpc call
1202     @param feedback_fn: function used send feedback back to the caller
1203     @param lu_result: previous Exec result
1204     @return: the new Exec result, based on the previous result
1205         and hook results
1206
1207     """
1208     # We only really run POST phase hooks, and are only interested in
1209     # their results
1210     if phase == constants.HOOKS_PHASE_POST:
1211       # Used to change hooks' output to proper indentation
1212       indent_re = re.compile('^', re.M)
1213       feedback_fn("* Hooks Results")
1214       if not hooks_results:
1215         feedback_fn("  - ERROR: general communication failure")
1216         lu_result = 1
1217       else:
1218         for node_name in hooks_results:
1219           show_node_header = True
1220           res = hooks_results[node_name]
1221           if res.failed or res.data is False or not isinstance(res.data, list):
1222             if res.offline:
1223               # no need to warn or set fail return value
1224               continue
1225             feedback_fn("    Communication failure in hooks execution")
1226             lu_result = 1
1227             continue
1228           for script, hkr, output in res.data:
1229             if hkr == constants.HKR_FAIL:
1230               # The node header is only shown once, if there are
1231               # failing hooks on that node
1232               if show_node_header:
1233                 feedback_fn("  Node %s:" % node_name)
1234                 show_node_header = False
1235               feedback_fn("    ERROR: Script %s failed, output:" % script)
1236               output = indent_re.sub('      ', output)
1237               feedback_fn("%s" % output)
1238               lu_result = 1
1239
1240       return lu_result
1241
1242
1243 class LUVerifyDisks(NoHooksLU):
1244   """Verifies the cluster disks status.
1245
1246   """
1247   _OP_REQP = []
1248   REQ_BGL = False
1249
1250   def ExpandNames(self):
1251     self.needed_locks = {
1252       locking.LEVEL_NODE: locking.ALL_SET,
1253       locking.LEVEL_INSTANCE: locking.ALL_SET,
1254     }
1255     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256
1257   def CheckPrereq(self):
1258     """Check prerequisites.
1259
1260     This has no prerequisites.
1261
1262     """
1263     pass
1264
1265   def Exec(self, feedback_fn):
1266     """Verify integrity of cluster disks.
1267
1268     """
1269     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270
1271     vg_name = self.cfg.GetVGName()
1272     nodes = utils.NiceSort(self.cfg.GetNodeList())
1273     instances = [self.cfg.GetInstanceInfo(name)
1274                  for name in self.cfg.GetInstanceList()]
1275
1276     nv_dict = {}
1277     for inst in instances:
1278       inst_lvs = {}
1279       if (not inst.admin_up or
1280           inst.disk_template not in constants.DTS_NET_MIRROR):
1281         continue
1282       inst.MapLVsByNode(inst_lvs)
1283       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284       for node, vol_list in inst_lvs.iteritems():
1285         for vol in vol_list:
1286           nv_dict[(node, vol)] = inst
1287
1288     if not nv_dict:
1289       return result
1290
1291     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292
1293     for node in nodes:
1294       # node_volume
1295       lvs = node_lvs[node]
1296       if lvs.failed:
1297         if not lvs.offline:
1298           self.LogWarning("Connection to node %s failed: %s" %
1299                           (node, lvs.data))
1300         continue
1301       lvs = lvs.data
1302       if isinstance(lvs, basestring):
1303         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1304         res_nlvm[node] = lvs
1305         continue
1306       elif not isinstance(lvs, dict):
1307         logging.warning("Connection to node %s failed or invalid data"
1308                         " returned", node)
1309         res_nodes.append(node)
1310         continue
1311
1312       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1313         inst = nv_dict.pop((node, lv_name), None)
1314         if (not lv_online and inst is not None
1315             and inst.name not in res_instances):
1316           res_instances.append(inst.name)
1317
1318     # any leftover items in nv_dict are missing LVs, let's arrange the
1319     # data better
1320     for key, inst in nv_dict.iteritems():
1321       if inst.name not in res_missing:
1322         res_missing[inst.name] = []
1323       res_missing[inst.name].append(key)
1324
1325     return result
1326
1327
1328 class LURepairDiskSizes(NoHooksLU):
1329   """Verifies the cluster disks sizes.
1330
1331   """
1332   _OP_REQP = ["instances"]
1333   REQ_BGL = False
1334
1335   def ExpandNames(self):
1336
1337     if not isinstance(self.op.instances, list):
1338       raise errors.OpPrereqError("Invalid argument type 'instances'")
1339
1340     if self.op.instances:
1341       self.wanted_names = []
1342       for name in self.op.instances:
1343         full_name = self.cfg.ExpandInstanceName(name)
1344         if full_name is None:
1345           raise errors.OpPrereqError("Instance '%s' not known" % name)
1346         self.wanted_names.append(full_name)
1347       self.needed_locks = {
1348         locking.LEVEL_NODE: [],
1349         locking.LEVEL_INSTANCE: self.wanted_names,
1350         }
1351       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1352     else:
1353       self.wanted_names = None
1354       self.needed_locks = {
1355         locking.LEVEL_NODE: locking.ALL_SET,
1356         locking.LEVEL_INSTANCE: locking.ALL_SET,
1357         }
1358     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1359
1360   def DeclareLocks(self, level):
1361     if level == locking.LEVEL_NODE and self.wanted_names is not None:
1362       self._LockInstancesNodes(primary_only=True)
1363
1364   def CheckPrereq(self):
1365     """Check prerequisites.
1366
1367     This only checks the optional instance list against the existing names.
1368
1369     """
1370     if self.wanted_names is None:
1371       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1372
1373     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1374                              in self.wanted_names]
1375
1376   def _EnsureChildSizes(self, disk):
1377     """Ensure children of the disk have the needed disk size.
1378
1379     This is valid mainly for DRBD8 and fixes an issue where the
1380     children have smaller disk size.
1381
1382     @param disk: an L{ganeti.objects.Disk} object
1383
1384     """
1385     if disk.dev_type == constants.LD_DRBD8:
1386       assert disk.children, "Empty children for DRBD8?"
1387       fchild = disk.children[0]
1388       mismatch = fchild.size < disk.size
1389       if mismatch:
1390         self.LogInfo("Child disk has size %d, parent %d, fixing",
1391                      fchild.size, disk.size)
1392         fchild.size = disk.size
1393
1394       # and we recurse on this child only, not on the metadev
1395       return self._EnsureChildSizes(fchild) or mismatch
1396     else:
1397       return False
1398
1399   def Exec(self, feedback_fn):
1400     """Verify the size of cluster disks.
1401
1402     """
1403     # TODO: check child disks too
1404     # TODO: check differences in size between primary/secondary nodes
1405     per_node_disks = {}
1406     for instance in self.wanted_instances:
1407       pnode = instance.primary_node
1408       if pnode not in per_node_disks:
1409         per_node_disks[pnode] = []
1410       for idx, disk in enumerate(instance.disks):
1411         per_node_disks[pnode].append((instance, idx, disk))
1412
1413     changed = []
1414     for node, dskl in per_node_disks.items():
1415       newl = [v[2].Copy() for v in dskl]
1416       for dsk in newl:
1417         self.cfg.SetDiskID(dsk, node)
1418       result = self.rpc.call_blockdev_getsizes(node, newl)
1419       if result.failed:
1420         self.LogWarning("Failure in blockdev_getsizes call to node"
1421                         " %s, ignoring", node)
1422         continue
1423       if len(result.data) != len(dskl):
1424         self.LogWarning("Invalid result from node %s, ignoring node results",
1425                         node)
1426         continue
1427       for ((instance, idx, disk), size) in zip(dskl, result.data):
1428         if size is None:
1429           self.LogWarning("Disk %d of instance %s did not return size"
1430                           " information, ignoring", idx, instance.name)
1431           continue
1432         if not isinstance(size, (int, long)):
1433           self.LogWarning("Disk %d of instance %s did not return valid"
1434                           " size information, ignoring", idx, instance.name)
1435           continue
1436         size = size >> 20
1437         if size != disk.size:
1438           self.LogInfo("Disk %d of instance %s has mismatched size,"
1439                        " correcting: recorded %d, actual %d", idx,
1440                        instance.name, disk.size, size)
1441           disk.size = size
1442           self.cfg.Update(instance)
1443           changed.append((instance.name, idx, size))
1444         if self._EnsureChildSizes(disk):
1445           self.cfg.Update(instance)
1446           changed.append((instance.name, idx, disk.size))
1447     return changed
1448
1449
1450 class LURenameCluster(LogicalUnit):
1451   """Rename the cluster.
1452
1453   """
1454   HPATH = "cluster-rename"
1455   HTYPE = constants.HTYPE_CLUSTER
1456   _OP_REQP = ["name"]
1457
1458   def BuildHooksEnv(self):
1459     """Build hooks env.
1460
1461     """
1462     env = {
1463       "OP_TARGET": self.cfg.GetClusterName(),
1464       "NEW_NAME": self.op.name,
1465       }
1466     mn = self.cfg.GetMasterNode()
1467     return env, [mn], [mn]
1468
1469   def CheckPrereq(self):
1470     """Verify that the passed name is a valid one.
1471
1472     """
1473     hostname = utils.HostInfo(self.op.name)
1474
1475     new_name = hostname.name
1476     self.ip = new_ip = hostname.ip
1477     old_name = self.cfg.GetClusterName()
1478     old_ip = self.cfg.GetMasterIP()
1479     if new_name == old_name and new_ip == old_ip:
1480       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1481                                  " cluster has changed")
1482     if new_ip != old_ip:
1483       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1484         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1485                                    " reachable on the network. Aborting." %
1486                                    new_ip)
1487
1488     self.op.name = new_name
1489
1490   def Exec(self, feedback_fn):
1491     """Rename the cluster.
1492
1493     """
1494     clustername = self.op.name
1495     ip = self.ip
1496
1497     # shutdown the master IP
1498     master = self.cfg.GetMasterNode()
1499     result = self.rpc.call_node_stop_master(master, False)
1500     if result.failed or not result.data:
1501       raise errors.OpExecError("Could not disable the master role")
1502
1503     try:
1504       cluster = self.cfg.GetClusterInfo()
1505       cluster.cluster_name = clustername
1506       cluster.master_ip = ip
1507       self.cfg.Update(cluster)
1508
1509       # update the known hosts file
1510       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1511       node_list = self.cfg.GetNodeList()
1512       try:
1513         node_list.remove(master)
1514       except ValueError:
1515         pass
1516       result = self.rpc.call_upload_file(node_list,
1517                                          constants.SSH_KNOWN_HOSTS_FILE)
1518       for to_node, to_result in result.iteritems():
1519         if to_result.failed or not to_result.data:
1520           logging.error("Copy of file %s to node %s failed",
1521                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1522
1523     finally:
1524       result = self.rpc.call_node_start_master(master, False, False)
1525       if result.failed or not result.data:
1526         self.LogWarning("Could not re-enable the master role on"
1527                         " the master, please restart manually.")
1528
1529
1530 def _RecursiveCheckIfLVMBased(disk):
1531   """Check if the given disk or its children are lvm-based.
1532
1533   @type disk: L{objects.Disk}
1534   @param disk: the disk to check
1535   @rtype: boolean
1536   @return: boolean indicating whether a LD_LV dev_type was found or not
1537
1538   """
1539   if disk.children:
1540     for chdisk in disk.children:
1541       if _RecursiveCheckIfLVMBased(chdisk):
1542         return True
1543   return disk.dev_type == constants.LD_LV
1544
1545
1546 class LUSetClusterParams(LogicalUnit):
1547   """Change the parameters of the cluster.
1548
1549   """
1550   HPATH = "cluster-modify"
1551   HTYPE = constants.HTYPE_CLUSTER
1552   _OP_REQP = []
1553   REQ_BGL = False
1554
1555   def CheckArguments(self):
1556     """Check parameters
1557
1558     """
1559     if not hasattr(self.op, "candidate_pool_size"):
1560       self.op.candidate_pool_size = None
1561     if self.op.candidate_pool_size is not None:
1562       try:
1563         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1564       except (ValueError, TypeError), err:
1565         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1566                                    str(err))
1567       if self.op.candidate_pool_size < 1:
1568         raise errors.OpPrereqError("At least one master candidate needed")
1569
1570   def ExpandNames(self):
1571     # FIXME: in the future maybe other cluster params won't require checking on
1572     # all nodes to be modified.
1573     self.needed_locks = {
1574       locking.LEVEL_NODE: locking.ALL_SET,
1575     }
1576     self.share_locks[locking.LEVEL_NODE] = 1
1577
1578   def BuildHooksEnv(self):
1579     """Build hooks env.
1580
1581     """
1582     env = {
1583       "OP_TARGET": self.cfg.GetClusterName(),
1584       "NEW_VG_NAME": self.op.vg_name,
1585       }
1586     mn = self.cfg.GetMasterNode()
1587     return env, [mn], [mn]
1588
1589   def CheckPrereq(self):
1590     """Check prerequisites.
1591
1592     This checks whether the given params don't conflict and
1593     if the given volume group is valid.
1594
1595     """
1596     if self.op.vg_name is not None and not self.op.vg_name:
1597       instances = self.cfg.GetAllInstancesInfo().values()
1598       for inst in instances:
1599         for disk in inst.disks:
1600           if _RecursiveCheckIfLVMBased(disk):
1601             raise errors.OpPrereqError("Cannot disable lvm storage while"
1602                                        " lvm-based instances exist")
1603
1604     node_list = self.acquired_locks[locking.LEVEL_NODE]
1605
1606     # if vg_name not None, checks given volume group on all nodes
1607     if self.op.vg_name:
1608       vglist = self.rpc.call_vg_list(node_list)
1609       for node in node_list:
1610         if vglist[node].failed:
1611           # ignoring down node
1612           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1613           continue
1614         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1615                                               self.op.vg_name,
1616                                               constants.MIN_VG_SIZE)
1617         if vgstatus:
1618           raise errors.OpPrereqError("Error on node '%s': %s" %
1619                                      (node, vgstatus))
1620
1621     self.cluster = cluster = self.cfg.GetClusterInfo()
1622     # validate beparams changes
1623     if self.op.beparams:
1624       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1625       self.new_beparams = cluster.FillDict(
1626         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1627
1628     # hypervisor list/parameters
1629     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1630     if self.op.hvparams:
1631       if not isinstance(self.op.hvparams, dict):
1632         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1633       for hv_name, hv_dict in self.op.hvparams.items():
1634         if hv_name not in self.new_hvparams:
1635           self.new_hvparams[hv_name] = hv_dict
1636         else:
1637           self.new_hvparams[hv_name].update(hv_dict)
1638
1639     if self.op.enabled_hypervisors is not None:
1640       self.hv_list = self.op.enabled_hypervisors
1641       if not self.hv_list:
1642         raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1643                                    " least one member")
1644       invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1645       if invalid_hvs:
1646         raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1647                                    " entries: %s" % invalid_hvs)
1648     else:
1649       self.hv_list = cluster.enabled_hypervisors
1650
1651     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1652       # either the enabled list has changed, or the parameters have, validate
1653       for hv_name, hv_params in self.new_hvparams.items():
1654         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1655             (self.op.enabled_hypervisors and
1656              hv_name in self.op.enabled_hypervisors)):
1657           # either this is a new hypervisor, or its parameters have changed
1658           hv_class = hypervisor.GetHypervisor(hv_name)
1659           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1660           hv_class.CheckParameterSyntax(hv_params)
1661           _CheckHVParams(self, node_list, hv_name, hv_params)
1662
1663   def Exec(self, feedback_fn):
1664     """Change the parameters of the cluster.
1665
1666     """
1667     if self.op.vg_name is not None:
1668       new_volume = self.op.vg_name
1669       if not new_volume:
1670         new_volume = None
1671       if new_volume != self.cfg.GetVGName():
1672         self.cfg.SetVGName(new_volume)
1673       else:
1674         feedback_fn("Cluster LVM configuration already in desired"
1675                     " state, not changing")
1676     if self.op.hvparams:
1677       self.cluster.hvparams = self.new_hvparams
1678     if self.op.enabled_hypervisors is not None:
1679       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1680     if self.op.beparams:
1681       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1682     if self.op.candidate_pool_size is not None:
1683       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1684       # we need to update the pool size here, otherwise the save will fail
1685       _AdjustCandidatePool(self)
1686
1687     self.cfg.Update(self.cluster)
1688
1689
1690 class LURedistributeConfig(NoHooksLU):
1691   """Force the redistribution of cluster configuration.
1692
1693   This is a very simple LU.
1694
1695   """
1696   _OP_REQP = []
1697   REQ_BGL = False
1698
1699   def ExpandNames(self):
1700     self.needed_locks = {
1701       locking.LEVEL_NODE: locking.ALL_SET,
1702     }
1703     self.share_locks[locking.LEVEL_NODE] = 1
1704
1705   def CheckPrereq(self):
1706     """Check prerequisites.
1707
1708     """
1709
1710   def Exec(self, feedback_fn):
1711     """Redistribute the configuration.
1712
1713     """
1714     self.cfg.Update(self.cfg.GetClusterInfo())
1715
1716
1717 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1718   """Sleep and poll for an instance's disk to sync.
1719
1720   """
1721   if not instance.disks:
1722     return True
1723
1724   if not oneshot:
1725     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1726
1727   node = instance.primary_node
1728
1729   for dev in instance.disks:
1730     lu.cfg.SetDiskID(dev, node)
1731
1732   retries = 0
1733   degr_retries = 10 # in seconds, as we sleep 1 second each time
1734   while True:
1735     max_time = 0
1736     done = True
1737     cumul_degraded = False
1738     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1739     if rstats.failed or not rstats.data:
1740       lu.LogWarning("Can't get any data from node %s", node)
1741       retries += 1
1742       if retries >= 10:
1743         raise errors.RemoteError("Can't contact node %s for mirror data,"
1744                                  " aborting." % node)
1745       time.sleep(6)
1746       continue
1747     rstats = rstats.data
1748     retries = 0
1749     for i, mstat in enumerate(rstats):
1750       if mstat is None:
1751         lu.LogWarning("Can't compute data for node %s/%s",
1752                            node, instance.disks[i].iv_name)
1753         continue
1754       # we ignore the ldisk parameter
1755       perc_done, est_time, is_degraded, _ = mstat
1756       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1757       if perc_done is not None:
1758         done = False
1759         if est_time is not None:
1760           rem_time = "%d estimated seconds remaining" % est_time
1761           max_time = est_time
1762         else:
1763           rem_time = "no time estimate"
1764         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1765                         (instance.disks[i].iv_name, perc_done, rem_time))
1766
1767     # if we're done but degraded, let's do a few small retries, to
1768     # make sure we see a stable and not transient situation; therefore
1769     # we force restart of the loop
1770     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1771       logging.info("Degraded disks found, %d retries left", degr_retries)
1772       degr_retries -= 1
1773       time.sleep(1)
1774       continue
1775
1776     if done or oneshot:
1777       break
1778
1779     time.sleep(min(60, max_time))
1780
1781   if done:
1782     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1783   return not cumul_degraded
1784
1785
1786 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1787   """Check that mirrors are not degraded.
1788
1789   The ldisk parameter, if True, will change the test from the
1790   is_degraded attribute (which represents overall non-ok status for
1791   the device(s)) to the ldisk (representing the local storage status).
1792
1793   """
1794   lu.cfg.SetDiskID(dev, node)
1795   if ldisk:
1796     idx = 6
1797   else:
1798     idx = 5
1799
1800   result = True
1801   if on_primary or dev.AssembleOnSecondary():
1802     rstats = lu.rpc.call_blockdev_find(node, dev)
1803     msg = rstats.RemoteFailMsg()
1804     if msg:
1805       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1806       result = False
1807     elif not rstats.payload:
1808       lu.LogWarning("Can't find disk on node %s", node)
1809       result = False
1810     else:
1811       result = result and (not rstats.payload[idx])
1812   if dev.children:
1813     for child in dev.children:
1814       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1815
1816   return result
1817
1818
1819 class LUDiagnoseOS(NoHooksLU):
1820   """Logical unit for OS diagnose/query.
1821
1822   """
1823   _OP_REQP = ["output_fields", "names"]
1824   REQ_BGL = False
1825   _FIELDS_STATIC = utils.FieldSet()
1826   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1827
1828   def ExpandNames(self):
1829     if self.op.names:
1830       raise errors.OpPrereqError("Selective OS query not supported")
1831
1832     _CheckOutputFields(static=self._FIELDS_STATIC,
1833                        dynamic=self._FIELDS_DYNAMIC,
1834                        selected=self.op.output_fields)
1835
1836     # Lock all nodes, in shared mode
1837     # Temporary removal of locks, should be reverted later
1838     # TODO: reintroduce locks when they are lighter-weight
1839     self.needed_locks = {}
1840     #self.share_locks[locking.LEVEL_NODE] = 1
1841     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1842
1843   def CheckPrereq(self):
1844     """Check prerequisites.
1845
1846     """
1847
1848   @staticmethod
1849   def _DiagnoseByOS(node_list, rlist):
1850     """Remaps a per-node return list into an a per-os per-node dictionary
1851
1852     @param node_list: a list with the names of all nodes
1853     @param rlist: a map with node names as keys and OS objects as values
1854
1855     @rtype: dict
1856     @return: a dictionary with osnames as keys and as value another map, with
1857         nodes as keys and list of OS objects as values, eg::
1858
1859           {"debian-etch": {"node1": [<object>,...],
1860                            "node2": [<object>,]}
1861           }
1862
1863     """
1864     all_os = {}
1865     # we build here the list of nodes that didn't fail the RPC (at RPC
1866     # level), so that nodes with a non-responding node daemon don't
1867     # make all OSes invalid
1868     good_nodes = [node_name for node_name in rlist
1869                   if not rlist[node_name].failed]
1870     for node_name, nr in rlist.iteritems():
1871       if nr.failed or not nr.data:
1872         continue
1873       for os_obj in nr.data:
1874         if os_obj.name not in all_os:
1875           # build a list of nodes for this os containing empty lists
1876           # for each node in node_list
1877           all_os[os_obj.name] = {}
1878           for nname in good_nodes:
1879             all_os[os_obj.name][nname] = []
1880         all_os[os_obj.name][node_name].append(os_obj)
1881     return all_os
1882
1883   def Exec(self, feedback_fn):
1884     """Compute the list of OSes.
1885
1886     """
1887     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1888     node_data = self.rpc.call_os_diagnose(valid_nodes)
1889     if node_data == False:
1890       raise errors.OpExecError("Can't gather the list of OSes")
1891     pol = self._DiagnoseByOS(valid_nodes, node_data)
1892     output = []
1893     for os_name, os_data in pol.iteritems():
1894       row = []
1895       for field in self.op.output_fields:
1896         if field == "name":
1897           val = os_name
1898         elif field == "valid":
1899           val = utils.all([osl and osl[0] for osl in os_data.values()])
1900         elif field == "node_status":
1901           val = {}
1902           for node_name, nos_list in os_data.iteritems():
1903             val[node_name] = [(v.status, v.path) for v in nos_list]
1904         else:
1905           raise errors.ParameterError(field)
1906         row.append(val)
1907       output.append(row)
1908
1909     return output
1910
1911
1912 class LURemoveNode(LogicalUnit):
1913   """Logical unit for removing a node.
1914
1915   """
1916   HPATH = "node-remove"
1917   HTYPE = constants.HTYPE_NODE
1918   _OP_REQP = ["node_name"]
1919
1920   def BuildHooksEnv(self):
1921     """Build hooks env.
1922
1923     This doesn't run on the target node in the pre phase as a failed
1924     node would then be impossible to remove.
1925
1926     """
1927     env = {
1928       "OP_TARGET": self.op.node_name,
1929       "NODE_NAME": self.op.node_name,
1930       }
1931     all_nodes = self.cfg.GetNodeList()
1932     all_nodes.remove(self.op.node_name)
1933     return env, all_nodes, all_nodes
1934
1935   def CheckPrereq(self):
1936     """Check prerequisites.
1937
1938     This checks:
1939      - the node exists in the configuration
1940      - it does not have primary or secondary instances
1941      - it's not the master
1942
1943     Any errors are signaled by raising errors.OpPrereqError.
1944
1945     """
1946     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1947     if node is None:
1948       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1949
1950     instance_list = self.cfg.GetInstanceList()
1951
1952     masternode = self.cfg.GetMasterNode()
1953     if node.name == masternode:
1954       raise errors.OpPrereqError("Node is the master node,"
1955                                  " you need to failover first.")
1956
1957     for instance_name in instance_list:
1958       instance = self.cfg.GetInstanceInfo(instance_name)
1959       if node.name in instance.all_nodes:
1960         raise errors.OpPrereqError("Instance %s is still running on the node,"
1961                                    " please remove first." % instance_name)
1962     self.op.node_name = node.name
1963     self.node = node
1964
1965   def Exec(self, feedback_fn):
1966     """Removes the node from the cluster.
1967
1968     """
1969     node = self.node
1970     logging.info("Stopping the node daemon and removing configs from node %s",
1971                  node.name)
1972
1973     self.context.RemoveNode(node.name)
1974
1975     self.rpc.call_node_leave_cluster(node.name)
1976
1977     # Promote nodes to master candidate as needed
1978     _AdjustCandidatePool(self)
1979
1980
1981 class LUQueryNodes(NoHooksLU):
1982   """Logical unit for querying nodes.
1983
1984   """
1985   _OP_REQP = ["output_fields", "names", "use_locking"]
1986   REQ_BGL = False
1987   _FIELDS_DYNAMIC = utils.FieldSet(
1988     "dtotal", "dfree",
1989     "mtotal", "mnode", "mfree",
1990     "bootid",
1991     "ctotal", "cnodes", "csockets",
1992     )
1993
1994   _FIELDS_STATIC = utils.FieldSet(
1995     "name", "pinst_cnt", "sinst_cnt",
1996     "pinst_list", "sinst_list",
1997     "pip", "sip", "tags",
1998     "serial_no",
1999     "master_candidate",
2000     "master",
2001     "offline",
2002     "drained",
2003     "role",
2004     )
2005
2006   def ExpandNames(self):
2007     _CheckOutputFields(static=self._FIELDS_STATIC,
2008                        dynamic=self._FIELDS_DYNAMIC,
2009                        selected=self.op.output_fields)
2010
2011     self.needed_locks = {}
2012     self.share_locks[locking.LEVEL_NODE] = 1
2013
2014     if self.op.names:
2015       self.wanted = _GetWantedNodes(self, self.op.names)
2016     else:
2017       self.wanted = locking.ALL_SET
2018
2019     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2020     self.do_locking = self.do_node_query and self.op.use_locking
2021     if self.do_locking:
2022       # if we don't request only static fields, we need to lock the nodes
2023       self.needed_locks[locking.LEVEL_NODE] = self.wanted
2024
2025
2026   def CheckPrereq(self):
2027     """Check prerequisites.
2028
2029     """
2030     # The validation of the node list is done in the _GetWantedNodes,
2031     # if non empty, and if empty, there's no validation to do
2032     pass
2033
2034   def Exec(self, feedback_fn):
2035     """Computes the list of nodes and their attributes.
2036
2037     """
2038     all_info = self.cfg.GetAllNodesInfo()
2039     if self.do_locking:
2040       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2041     elif self.wanted != locking.ALL_SET:
2042       nodenames = self.wanted
2043       missing = set(nodenames).difference(all_info.keys())
2044       if missing:
2045         raise errors.OpExecError(
2046           "Some nodes were removed before retrieving their data: %s" % missing)
2047     else:
2048       nodenames = all_info.keys()
2049
2050     nodenames = utils.NiceSort(nodenames)
2051     nodelist = [all_info[name] for name in nodenames]
2052
2053     # begin data gathering
2054
2055     if self.do_node_query:
2056       live_data = {}
2057       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2058                                           self.cfg.GetHypervisorType())
2059       for name in nodenames:
2060         nodeinfo = node_data[name]
2061         if not nodeinfo.failed and nodeinfo.data:
2062           nodeinfo = nodeinfo.data
2063           fn = utils.TryConvert
2064           live_data[name] = {
2065             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2066             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2067             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2068             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2069             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2070             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2071             "bootid": nodeinfo.get('bootid', None),
2072             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2073             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2074             }
2075         else:
2076           live_data[name] = {}
2077     else:
2078       live_data = dict.fromkeys(nodenames, {})
2079
2080     node_to_primary = dict([(name, set()) for name in nodenames])
2081     node_to_secondary = dict([(name, set()) for name in nodenames])
2082
2083     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2084                              "sinst_cnt", "sinst_list"))
2085     if inst_fields & frozenset(self.op.output_fields):
2086       inst_data = self.cfg.GetAllInstancesInfo()
2087
2088       for instance_name, inst in inst_data.items():
2089         if inst.primary_node in node_to_primary:
2090           node_to_primary[inst.primary_node].add(inst.name)
2091         for secnode in inst.secondary_nodes:
2092           if secnode in node_to_secondary:
2093             node_to_secondary[secnode].add(inst.name)
2094
2095     master_node = self.cfg.GetMasterNode()
2096
2097     # end data gathering
2098
2099     output = []
2100     for node in nodelist:
2101       node_output = []
2102       for field in self.op.output_fields:
2103         if field == "name":
2104           val = node.name
2105         elif field == "pinst_list":
2106           val = list(node_to_primary[node.name])
2107         elif field == "sinst_list":
2108           val = list(node_to_secondary[node.name])
2109         elif field == "pinst_cnt":
2110           val = len(node_to_primary[node.name])
2111         elif field == "sinst_cnt":
2112           val = len(node_to_secondary[node.name])
2113         elif field == "pip":
2114           val = node.primary_ip
2115         elif field == "sip":
2116           val = node.secondary_ip
2117         elif field == "tags":
2118           val = list(node.GetTags())
2119         elif field == "serial_no":
2120           val = node.serial_no
2121         elif field == "master_candidate":
2122           val = node.master_candidate
2123         elif field == "master":
2124           val = node.name == master_node
2125         elif field == "offline":
2126           val = node.offline
2127         elif field == "drained":
2128           val = node.drained
2129         elif self._FIELDS_DYNAMIC.Matches(field):
2130           val = live_data[node.name].get(field, None)
2131         elif field == "role":
2132           if node.name == master_node:
2133             val = "M"
2134           elif node.master_candidate:
2135             val = "C"
2136           elif node.drained:
2137             val = "D"
2138           elif node.offline:
2139             val = "O"
2140           else:
2141             val = "R"
2142         else:
2143           raise errors.ParameterError(field)
2144         node_output.append(val)
2145       output.append(node_output)
2146
2147     return output
2148
2149
2150 class LUQueryNodeVolumes(NoHooksLU):
2151   """Logical unit for getting volumes on node(s).
2152
2153   """
2154   _OP_REQP = ["nodes", "output_fields"]
2155   REQ_BGL = False
2156   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2157   _FIELDS_STATIC = utils.FieldSet("node")
2158
2159   def ExpandNames(self):
2160     _CheckOutputFields(static=self._FIELDS_STATIC,
2161                        dynamic=self._FIELDS_DYNAMIC,
2162                        selected=self.op.output_fields)
2163
2164     self.needed_locks = {}
2165     self.share_locks[locking.LEVEL_NODE] = 1
2166     if not self.op.nodes:
2167       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2168     else:
2169       self.needed_locks[locking.LEVEL_NODE] = \
2170         _GetWantedNodes(self, self.op.nodes)
2171
2172   def CheckPrereq(self):
2173     """Check prerequisites.
2174
2175     This checks that the fields required are valid output fields.
2176
2177     """
2178     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2179
2180   def Exec(self, feedback_fn):
2181     """Computes the list of nodes and their attributes.
2182
2183     """
2184     nodenames = self.nodes
2185     volumes = self.rpc.call_node_volumes(nodenames)
2186
2187     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2188              in self.cfg.GetInstanceList()]
2189
2190     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2191
2192     output = []
2193     for node in nodenames:
2194       if node not in volumes or volumes[node].failed or not volumes[node].data:
2195         continue
2196
2197       node_vols = volumes[node].data[:]
2198       node_vols.sort(key=lambda vol: vol['dev'])
2199
2200       for vol in node_vols:
2201         node_output = []
2202         for field in self.op.output_fields:
2203           if field == "node":
2204             val = node
2205           elif field == "phys":
2206             val = vol['dev']
2207           elif field == "vg":
2208             val = vol['vg']
2209           elif field == "name":
2210             val = vol['name']
2211           elif field == "size":
2212             val = int(float(vol['size']))
2213           elif field == "instance":
2214             for inst in ilist:
2215               if node not in lv_by_node[inst]:
2216                 continue
2217               if vol['name'] in lv_by_node[inst][node]:
2218                 val = inst.name
2219                 break
2220             else:
2221               val = '-'
2222           else:
2223             raise errors.ParameterError(field)
2224           node_output.append(str(val))
2225
2226         output.append(node_output)
2227
2228     return output
2229
2230
2231 class LUAddNode(LogicalUnit):
2232   """Logical unit for adding node to the cluster.
2233
2234   """
2235   HPATH = "node-add"
2236   HTYPE = constants.HTYPE_NODE
2237   _OP_REQP = ["node_name"]
2238
2239   def BuildHooksEnv(self):
2240     """Build hooks env.
2241
2242     This will run on all nodes before, and on all nodes + the new node after.
2243
2244     """
2245     env = {
2246       "OP_TARGET": self.op.node_name,
2247       "NODE_NAME": self.op.node_name,
2248       "NODE_PIP": self.op.primary_ip,
2249       "NODE_SIP": self.op.secondary_ip,
2250       }
2251     nodes_0 = self.cfg.GetNodeList()
2252     nodes_1 = nodes_0 + [self.op.node_name, ]
2253     return env, nodes_0, nodes_1
2254
2255   def CheckPrereq(self):
2256     """Check prerequisites.
2257
2258     This checks:
2259      - the new node is not already in the config
2260      - it is resolvable
2261      - its parameters (single/dual homed) matches the cluster
2262
2263     Any errors are signaled by raising errors.OpPrereqError.
2264
2265     """
2266     node_name = self.op.node_name
2267     cfg = self.cfg
2268
2269     dns_data = utils.HostInfo(node_name)
2270
2271     node = dns_data.name
2272     primary_ip = self.op.primary_ip = dns_data.ip
2273     secondary_ip = getattr(self.op, "secondary_ip", None)
2274     if secondary_ip is None:
2275       secondary_ip = primary_ip
2276     if not utils.IsValidIP(secondary_ip):
2277       raise errors.OpPrereqError("Invalid secondary IP given")
2278     self.op.secondary_ip = secondary_ip
2279
2280     node_list = cfg.GetNodeList()
2281     if not self.op.readd and node in node_list:
2282       raise errors.OpPrereqError("Node %s is already in the configuration" %
2283                                  node)
2284     elif self.op.readd and node not in node_list:
2285       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2286
2287     for existing_node_name in node_list:
2288       existing_node = cfg.GetNodeInfo(existing_node_name)
2289
2290       if self.op.readd and node == existing_node_name:
2291         if (existing_node.primary_ip != primary_ip or
2292             existing_node.secondary_ip != secondary_ip):
2293           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2294                                      " address configuration as before")
2295         continue
2296
2297       if (existing_node.primary_ip == primary_ip or
2298           existing_node.secondary_ip == primary_ip or
2299           existing_node.primary_ip == secondary_ip or
2300           existing_node.secondary_ip == secondary_ip):
2301         raise errors.OpPrereqError("New node ip address(es) conflict with"
2302                                    " existing node %s" % existing_node.name)
2303
2304     # check that the type of the node (single versus dual homed) is the
2305     # same as for the master
2306     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2307     master_singlehomed = myself.secondary_ip == myself.primary_ip
2308     newbie_singlehomed = secondary_ip == primary_ip
2309     if master_singlehomed != newbie_singlehomed:
2310       if master_singlehomed:
2311         raise errors.OpPrereqError("The master has no private ip but the"
2312                                    " new node has one")
2313       else:
2314         raise errors.OpPrereqError("The master has a private ip but the"
2315                                    " new node doesn't have one")
2316
2317     # checks reachability
2318     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2319       raise errors.OpPrereqError("Node not reachable by ping")
2320
2321     if not newbie_singlehomed:
2322       # check reachability from my secondary ip to newbie's secondary ip
2323       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2324                            source=myself.secondary_ip):
2325         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2326                                    " based ping to noded port")
2327
2328     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2329     if self.op.readd:
2330       exceptions = [node]
2331     else:
2332       exceptions = []
2333     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2334     # the new node will increase mc_max with one, so:
2335     mc_max = min(mc_max + 1, cp_size)
2336     self.master_candidate = mc_now < mc_max
2337
2338     if self.op.readd:
2339       self.new_node = self.cfg.GetNodeInfo(node)
2340       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2341     else:
2342       self.new_node = objects.Node(name=node,
2343                                    primary_ip=primary_ip,
2344                                    secondary_ip=secondary_ip,
2345                                    master_candidate=self.master_candidate,
2346                                    offline=False, drained=False)
2347
2348   def Exec(self, feedback_fn):
2349     """Adds the new node to the cluster.
2350
2351     """
2352     new_node = self.new_node
2353     node = new_node.name
2354
2355     # for re-adds, reset the offline/drained/master-candidate flags;
2356     # we need to reset here, otherwise offline would prevent RPC calls
2357     # later in the procedure; this also means that if the re-add
2358     # fails, we are left with a non-offlined, broken node
2359     if self.op.readd:
2360       new_node.drained = new_node.offline = False
2361       self.LogInfo("Readding a node, the offline/drained flags were reset")
2362       # if we demote the node, we do cleanup later in the procedure
2363       new_node.master_candidate = self.master_candidate
2364
2365     # notify the user about any possible mc promotion
2366     if new_node.master_candidate:
2367       self.LogInfo("Node will be a master candidate")
2368
2369     # check connectivity
2370     result = self.rpc.call_version([node])[node]
2371     result.Raise()
2372     if result.data:
2373       if constants.PROTOCOL_VERSION == result.data:
2374         logging.info("Communication to node %s fine, sw version %s match",
2375                      node, result.data)
2376       else:
2377         raise errors.OpExecError("Version mismatch master version %s,"
2378                                  " node version %s" %
2379                                  (constants.PROTOCOL_VERSION, result.data))
2380     else:
2381       raise errors.OpExecError("Cannot get version from the new node")
2382
2383     # setup ssh on node
2384     logging.info("Copy ssh key to node %s", node)
2385     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2386     keyarray = []
2387     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2388                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2389                 priv_key, pub_key]
2390
2391     for i in keyfiles:
2392       f = open(i, 'r')
2393       try:
2394         keyarray.append(f.read())
2395       finally:
2396         f.close()
2397
2398     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2399                                     keyarray[2],
2400                                     keyarray[3], keyarray[4], keyarray[5])
2401
2402     msg = result.RemoteFailMsg()
2403     if msg:
2404       raise errors.OpExecError("Cannot transfer ssh keys to the"
2405                                " new node: %s" % msg)
2406
2407     # Add node to our /etc/hosts, and add key to known_hosts
2408     if self.cfg.GetClusterInfo().modify_etc_hosts:
2409       utils.AddHostToEtcHosts(new_node.name)
2410
2411     if new_node.secondary_ip != new_node.primary_ip:
2412       result = self.rpc.call_node_has_ip_address(new_node.name,
2413                                                  new_node.secondary_ip)
2414       if result.failed or not result.data:
2415         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2416                                  " you gave (%s). Please fix and re-run this"
2417                                  " command." % new_node.secondary_ip)
2418
2419     node_verify_list = [self.cfg.GetMasterNode()]
2420     node_verify_param = {
2421       'nodelist': [node],
2422       # TODO: do a node-net-test as well?
2423     }
2424
2425     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2426                                        self.cfg.GetClusterName())
2427     for verifier in node_verify_list:
2428       if result[verifier].failed or not result[verifier].data:
2429         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2430                                  " for remote verification" % verifier)
2431       if result[verifier].data['nodelist']:
2432         for failed in result[verifier].data['nodelist']:
2433           feedback_fn("ssh/hostname verification failed"
2434                       " (checking from %s): %s" %
2435                       (verifier, result[verifier].data['nodelist'][failed]))
2436         raise errors.OpExecError("ssh/hostname verification failed.")
2437
2438     # Distribute updated /etc/hosts and known_hosts to all nodes,
2439     # including the node just added
2440     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2441     dist_nodes = self.cfg.GetNodeList()
2442     if not self.op.readd:
2443       dist_nodes.append(node)
2444     if myself.name in dist_nodes:
2445       dist_nodes.remove(myself.name)
2446
2447     logging.debug("Copying hosts and known_hosts to all nodes")
2448     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2449       result = self.rpc.call_upload_file(dist_nodes, fname)
2450       for to_node, to_result in result.iteritems():
2451         if to_result.failed or not to_result.data:
2452           logging.error("Copy of file %s to node %s failed", fname, to_node)
2453
2454     to_copy = []
2455     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2456     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2457       to_copy.append(constants.VNC_PASSWORD_FILE)
2458
2459     for fname in to_copy:
2460       result = self.rpc.call_upload_file([node], fname)
2461       if result[node].failed or not result[node]:
2462         logging.error("Could not copy file %s to node %s", fname, node)
2463
2464     if self.op.readd:
2465       self.context.ReaddNode(new_node)
2466       # make sure we redistribute the config
2467       self.cfg.Update(new_node)
2468       # and make sure the new node will not have old files around
2469       if not new_node.master_candidate:
2470         result = self.rpc.call_node_demote_from_mc(new_node.name)
2471         msg = result.RemoteFailMsg()
2472         if msg:
2473           self.LogWarning("Node failed to demote itself from master"
2474                           " candidate status: %s" % msg)
2475     else:
2476       self.context.AddNode(new_node)
2477
2478
2479 class LUSetNodeParams(LogicalUnit):
2480   """Modifies the parameters of a node.
2481
2482   """
2483   HPATH = "node-modify"
2484   HTYPE = constants.HTYPE_NODE
2485   _OP_REQP = ["node_name"]
2486   REQ_BGL = False
2487
2488   def CheckArguments(self):
2489     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2490     if node_name is None:
2491       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2492     self.op.node_name = node_name
2493     _CheckBooleanOpField(self.op, 'master_candidate')
2494     _CheckBooleanOpField(self.op, 'offline')
2495     _CheckBooleanOpField(self.op, 'drained')
2496     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2497     if all_mods.count(None) == 3:
2498       raise errors.OpPrereqError("Please pass at least one modification")
2499     if all_mods.count(True) > 1:
2500       raise errors.OpPrereqError("Can't set the node into more than one"
2501                                  " state at the same time")
2502
2503   def ExpandNames(self):
2504     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2505
2506   def BuildHooksEnv(self):
2507     """Build hooks env.
2508
2509     This runs on the master node.
2510
2511     """
2512     env = {
2513       "OP_TARGET": self.op.node_name,
2514       "MASTER_CANDIDATE": str(self.op.master_candidate),
2515       "OFFLINE": str(self.op.offline),
2516       "DRAINED": str(self.op.drained),
2517       }
2518     nl = [self.cfg.GetMasterNode(),
2519           self.op.node_name]
2520     return env, nl, nl
2521
2522   def CheckPrereq(self):
2523     """Check prerequisites.
2524
2525     This only checks the instance list against the existing names.
2526
2527     """
2528     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2529
2530     if (self.op.master_candidate is not None or
2531         self.op.drained is not None or
2532         self.op.offline is not None):
2533       # we can't change the master's node flags
2534       if self.op.node_name == self.cfg.GetMasterNode():
2535         raise errors.OpPrereqError("The master role can be changed"
2536                                    " only via masterfailover")
2537
2538     if ((self.op.master_candidate == False or self.op.offline == True or
2539          self.op.drained == True) and node.master_candidate):
2540       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2541       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2542       if num_candidates <= cp_size:
2543         msg = ("Not enough master candidates (desired"
2544                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2545         if self.op.force:
2546           self.LogWarning(msg)
2547         else:
2548           raise errors.OpPrereqError(msg)
2549
2550     if (self.op.master_candidate == True and
2551         ((node.offline and not self.op.offline == False) or
2552          (node.drained and not self.op.drained == False))):
2553       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2554                                  " to master_candidate" % node.name)
2555
2556     return
2557
2558   def Exec(self, feedback_fn):
2559     """Modifies a node.
2560
2561     """
2562     node = self.node
2563
2564     result = []
2565     changed_mc = False
2566
2567     if self.op.offline is not None:
2568       node.offline = self.op.offline
2569       result.append(("offline", str(self.op.offline)))
2570       if self.op.offline == True:
2571         if node.master_candidate:
2572           node.master_candidate = False
2573           changed_mc = True
2574           result.append(("master_candidate", "auto-demotion due to offline"))
2575         if node.drained:
2576           node.drained = False
2577           result.append(("drained", "clear drained status due to offline"))
2578
2579     if self.op.master_candidate is not None:
2580       node.master_candidate = self.op.master_candidate
2581       changed_mc = True
2582       result.append(("master_candidate", str(self.op.master_candidate)))
2583       if self.op.master_candidate == False:
2584         rrc = self.rpc.call_node_demote_from_mc(node.name)
2585         msg = rrc.RemoteFailMsg()
2586         if msg:
2587           self.LogWarning("Node failed to demote itself: %s" % msg)
2588
2589     if self.op.drained is not None:
2590       node.drained = self.op.drained
2591       result.append(("drained", str(self.op.drained)))
2592       if self.op.drained == True:
2593         if node.master_candidate:
2594           node.master_candidate = False
2595           changed_mc = True
2596           result.append(("master_candidate", "auto-demotion due to drain"))
2597           rrc = self.rpc.call_node_demote_from_mc(node.name)
2598           msg = rrc.RemoteFailMsg()
2599           if msg:
2600             self.LogWarning("Node failed to demote itself: %s" % msg)
2601         if node.offline:
2602           node.offline = False
2603           result.append(("offline", "clear offline status due to drain"))
2604
2605     # this will trigger configuration file update, if needed
2606     self.cfg.Update(node)
2607     # this will trigger job queue propagation or cleanup
2608     if changed_mc:
2609       self.context.ReaddNode(node)
2610
2611     return result
2612
2613
2614 class LUQueryClusterInfo(NoHooksLU):
2615   """Query cluster configuration.
2616
2617   """
2618   _OP_REQP = []
2619   REQ_BGL = False
2620
2621   def ExpandNames(self):
2622     self.needed_locks = {}
2623
2624   def CheckPrereq(self):
2625     """No prerequsites needed for this LU.
2626
2627     """
2628     pass
2629
2630   def Exec(self, feedback_fn):
2631     """Return cluster config.
2632
2633     """
2634     cluster = self.cfg.GetClusterInfo()
2635     result = {
2636       "software_version": constants.RELEASE_VERSION,
2637       "protocol_version": constants.PROTOCOL_VERSION,
2638       "config_version": constants.CONFIG_VERSION,
2639       "os_api_version": constants.OS_API_VERSION,
2640       "export_version": constants.EXPORT_VERSION,
2641       "architecture": (platform.architecture()[0], platform.machine()),
2642       "name": cluster.cluster_name,
2643       "master": cluster.master_node,
2644       "default_hypervisor": cluster.default_hypervisor,
2645       "enabled_hypervisors": cluster.enabled_hypervisors,
2646       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2647                         for hypervisor_name in cluster.enabled_hypervisors]),
2648       "beparams": cluster.beparams,
2649       "candidate_pool_size": cluster.candidate_pool_size,
2650       "default_bridge": cluster.default_bridge,
2651       "master_netdev": cluster.master_netdev,
2652       "volume_group_name": cluster.volume_group_name,
2653       "file_storage_dir": cluster.file_storage_dir,
2654       "tags": list(cluster.GetTags()),
2655       }
2656
2657     return result
2658
2659
2660 class LUQueryConfigValues(NoHooksLU):
2661   """Return configuration values.
2662
2663   """
2664   _OP_REQP = []
2665   REQ_BGL = False
2666   _FIELDS_DYNAMIC = utils.FieldSet()
2667   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2668
2669   def ExpandNames(self):
2670     self.needed_locks = {}
2671
2672     _CheckOutputFields(static=self._FIELDS_STATIC,
2673                        dynamic=self._FIELDS_DYNAMIC,
2674                        selected=self.op.output_fields)
2675
2676   def CheckPrereq(self):
2677     """No prerequisites.
2678
2679     """
2680     pass
2681
2682   def Exec(self, feedback_fn):
2683     """Dump a representation of the cluster config to the standard output.
2684
2685     """
2686     values = []
2687     for field in self.op.output_fields:
2688       if field == "cluster_name":
2689         entry = self.cfg.GetClusterName()
2690       elif field == "master_node":
2691         entry = self.cfg.GetMasterNode()
2692       elif field == "drain_flag":
2693         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2694       else:
2695         raise errors.ParameterError(field)
2696       values.append(entry)
2697     return values
2698
2699
2700 class LUActivateInstanceDisks(NoHooksLU):
2701   """Bring up an instance's disks.
2702
2703   """
2704   _OP_REQP = ["instance_name"]
2705   REQ_BGL = False
2706
2707   def ExpandNames(self):
2708     self._ExpandAndLockInstance()
2709     self.needed_locks[locking.LEVEL_NODE] = []
2710     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2711
2712   def DeclareLocks(self, level):
2713     if level == locking.LEVEL_NODE:
2714       self._LockInstancesNodes()
2715
2716   def CheckPrereq(self):
2717     """Check prerequisites.
2718
2719     This checks that the instance is in the cluster.
2720
2721     """
2722     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2723     assert self.instance is not None, \
2724       "Cannot retrieve locked instance %s" % self.op.instance_name
2725     _CheckNodeOnline(self, self.instance.primary_node)
2726     if not hasattr(self.op, "ignore_size"):
2727       self.op.ignore_size = False
2728
2729   def Exec(self, feedback_fn):
2730     """Activate the disks.
2731
2732     """
2733     disks_ok, disks_info = \
2734               _AssembleInstanceDisks(self, self.instance,
2735                                      ignore_size=self.op.ignore_size)
2736     if not disks_ok:
2737       raise errors.OpExecError("Cannot activate block devices")
2738
2739     return disks_info
2740
2741
2742 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
2743                            ignore_size=False):
2744   """Prepare the block devices for an instance.
2745
2746   This sets up the block devices on all nodes.
2747
2748   @type lu: L{LogicalUnit}
2749   @param lu: the logical unit on whose behalf we execute
2750   @type instance: L{objects.Instance}
2751   @param instance: the instance for whose disks we assemble
2752   @type ignore_secondaries: boolean
2753   @param ignore_secondaries: if true, errors on secondary nodes
2754       won't result in an error return from the function
2755   @type ignore_size: boolean
2756   @param ignore_size: if true, the current known size of the disk
2757       will not be used during the disk activation, useful for cases
2758       when the size is wrong
2759   @return: False if the operation failed, otherwise a list of
2760       (host, instance_visible_name, node_visible_name)
2761       with the mapping from node devices to instance devices
2762
2763   """
2764   device_info = []
2765   disks_ok = True
2766   iname = instance.name
2767   # With the two passes mechanism we try to reduce the window of
2768   # opportunity for the race condition of switching DRBD to primary
2769   # before handshaking occured, but we do not eliminate it
2770
2771   # The proper fix would be to wait (with some limits) until the
2772   # connection has been made and drbd transitions from WFConnection
2773   # into any other network-connected state (Connected, SyncTarget,
2774   # SyncSource, etc.)
2775
2776   # 1st pass, assemble on all nodes in secondary mode
2777   for inst_disk in instance.disks:
2778     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2779       if ignore_size:
2780         node_disk = node_disk.Copy()
2781         node_disk.UnsetSize()
2782       lu.cfg.SetDiskID(node_disk, node)
2783       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2784       msg = result.RemoteFailMsg()
2785       if msg:
2786         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2787                            " (is_primary=False, pass=1): %s",
2788                            inst_disk.iv_name, node, msg)
2789         if not ignore_secondaries:
2790           disks_ok = False
2791
2792   # FIXME: race condition on drbd migration to primary
2793
2794   # 2nd pass, do only the primary node
2795   for inst_disk in instance.disks:
2796     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2797       if node != instance.primary_node:
2798         continue
2799       if ignore_size:
2800         node_disk = node_disk.Copy()
2801         node_disk.UnsetSize()
2802       lu.cfg.SetDiskID(node_disk, node)
2803       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2804       msg = result.RemoteFailMsg()
2805       if msg:
2806         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2807                            " (is_primary=True, pass=2): %s",
2808                            inst_disk.iv_name, node, msg)
2809         disks_ok = False
2810     device_info.append((instance.primary_node, inst_disk.iv_name,
2811                         result.payload))
2812
2813   # leave the disks configured for the primary node
2814   # this is a workaround that would be fixed better by
2815   # improving the logical/physical id handling
2816   for disk in instance.disks:
2817     lu.cfg.SetDiskID(disk, instance.primary_node)
2818
2819   return disks_ok, device_info
2820
2821
2822 def _StartInstanceDisks(lu, instance, force):
2823   """Start the disks of an instance.
2824
2825   """
2826   disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2827                                            ignore_secondaries=force)
2828   if not disks_ok:
2829     _ShutdownInstanceDisks(lu, instance)
2830     if force is not None and not force:
2831       lu.proc.LogWarning("", hint="If the message above refers to a"
2832                          " secondary node,"
2833                          " you can retry the operation using '--force'.")
2834     raise errors.OpExecError("Disk consistency error")
2835
2836
2837 class LUDeactivateInstanceDisks(NoHooksLU):
2838   """Shutdown an instance's disks.
2839
2840   """
2841   _OP_REQP = ["instance_name"]
2842   REQ_BGL = False
2843
2844   def ExpandNames(self):
2845     self._ExpandAndLockInstance()
2846     self.needed_locks[locking.LEVEL_NODE] = []
2847     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2848
2849   def DeclareLocks(self, level):
2850     if level == locking.LEVEL_NODE:
2851       self._LockInstancesNodes()
2852
2853   def CheckPrereq(self):
2854     """Check prerequisites.
2855
2856     This checks that the instance is in the cluster.
2857
2858     """
2859     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2860     assert self.instance is not None, \
2861       "Cannot retrieve locked instance %s" % self.op.instance_name
2862
2863   def Exec(self, feedback_fn):
2864     """Deactivate the disks
2865
2866     """
2867     instance = self.instance
2868     _SafeShutdownInstanceDisks(self, instance)
2869
2870
2871 def _SafeShutdownInstanceDisks(lu, instance):
2872   """Shutdown block devices of an instance.
2873
2874   This function checks if an instance is running, before calling
2875   _ShutdownInstanceDisks.
2876
2877   """
2878   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2879                                       [instance.hypervisor])
2880   ins_l = ins_l[instance.primary_node]
2881   if ins_l.failed or not isinstance(ins_l.data, list):
2882     raise errors.OpExecError("Can't contact node '%s'" %
2883                              instance.primary_node)
2884
2885   if instance.name in ins_l.data:
2886     raise errors.OpExecError("Instance is running, can't shutdown"
2887                              " block devices.")
2888
2889   _ShutdownInstanceDisks(lu, instance)
2890
2891
2892 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2893   """Shutdown block devices of an instance.
2894
2895   This does the shutdown on all nodes of the instance.
2896
2897   If the ignore_primary is false, errors on the primary node are
2898   ignored.
2899
2900   """
2901   all_result = True
2902   for disk in instance.disks:
2903     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2904       lu.cfg.SetDiskID(top_disk, node)
2905       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2906       msg = result.RemoteFailMsg()
2907       if msg:
2908         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2909                       disk.iv_name, node, msg)
2910         if not ignore_primary or node != instance.primary_node:
2911           all_result = False
2912   return all_result
2913
2914
2915 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2916   """Checks if a node has enough free memory.
2917
2918   This function check if a given node has the needed amount of free
2919   memory. In case the node has less memory or we cannot get the
2920   information from the node, this function raise an OpPrereqError
2921   exception.
2922
2923   @type lu: C{LogicalUnit}
2924   @param lu: a logical unit from which we get configuration data
2925   @type node: C{str}
2926   @param node: the node to check
2927   @type reason: C{str}
2928   @param reason: string to use in the error message
2929   @type requested: C{int}
2930   @param requested: the amount of memory in MiB to check for
2931   @type hypervisor_name: C{str}
2932   @param hypervisor_name: the hypervisor to ask for memory stats
2933   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2934       we cannot check the node
2935
2936   """
2937   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2938   nodeinfo[node].Raise()
2939   free_mem = nodeinfo[node].data.get('memory_free')
2940   if not isinstance(free_mem, int):
2941     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2942                              " was '%s'" % (node, free_mem))
2943   if requested > free_mem:
2944     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2945                              " needed %s MiB, available %s MiB" %
2946                              (node, reason, requested, free_mem))
2947
2948
2949 class LUStartupInstance(LogicalUnit):
2950   """Starts an instance.
2951
2952   """
2953   HPATH = "instance-start"
2954   HTYPE = constants.HTYPE_INSTANCE
2955   _OP_REQP = ["instance_name", "force"]
2956   REQ_BGL = False
2957
2958   def ExpandNames(self):
2959     self._ExpandAndLockInstance()
2960
2961   def BuildHooksEnv(self):
2962     """Build hooks env.
2963
2964     This runs on master, primary and secondary nodes of the instance.
2965
2966     """
2967     env = {
2968       "FORCE": self.op.force,
2969       }
2970     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2971     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2972     return env, nl, nl
2973
2974   def CheckPrereq(self):
2975     """Check prerequisites.
2976
2977     This checks that the instance is in the cluster.
2978
2979     """
2980     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2981     assert self.instance is not None, \
2982       "Cannot retrieve locked instance %s" % self.op.instance_name
2983
2984     # extra beparams
2985     self.beparams = getattr(self.op, "beparams", {})
2986     if self.beparams:
2987       if not isinstance(self.beparams, dict):
2988         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2989                                    " dict" % (type(self.beparams), ))
2990       # fill the beparams dict
2991       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2992       self.op.beparams = self.beparams
2993
2994     # extra hvparams
2995     self.hvparams = getattr(self.op, "hvparams", {})
2996     if self.hvparams:
2997       if not isinstance(self.hvparams, dict):
2998         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2999                                    " dict" % (type(self.hvparams), ))
3000
3001       # check hypervisor parameter syntax (locally)
3002       cluster = self.cfg.GetClusterInfo()
3003       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3004       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
3005                                     instance.hvparams)
3006       filled_hvp.update(self.hvparams)
3007       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3008       hv_type.CheckParameterSyntax(filled_hvp)
3009       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3010       self.op.hvparams = self.hvparams
3011
3012     _CheckNodeOnline(self, instance.primary_node)
3013
3014     bep = self.cfg.GetClusterInfo().FillBE(instance)
3015     # check bridges existence
3016     _CheckInstanceBridgesExist(self, instance)
3017
3018     remote_info = self.rpc.call_instance_info(instance.primary_node,
3019                                               instance.name,
3020                                               instance.hypervisor)
3021     remote_info.Raise()
3022     if not remote_info.data:
3023       _CheckNodeFreeMemory(self, instance.primary_node,
3024                            "starting instance %s" % instance.name,
3025                            bep[constants.BE_MEMORY], instance.hypervisor)
3026
3027   def Exec(self, feedback_fn):
3028     """Start the instance.
3029
3030     """
3031     instance = self.instance
3032     force = self.op.force
3033
3034     self.cfg.MarkInstanceUp(instance.name)
3035
3036     node_current = instance.primary_node
3037
3038     _StartInstanceDisks(self, instance, force)
3039
3040     result = self.rpc.call_instance_start(node_current, instance,
3041                                           self.hvparams, self.beparams)
3042     msg = result.RemoteFailMsg()
3043     if msg:
3044       _ShutdownInstanceDisks(self, instance)
3045       raise errors.OpExecError("Could not start instance: %s" % msg)
3046
3047
3048 class LURebootInstance(LogicalUnit):
3049   """Reboot an instance.
3050
3051   """
3052   HPATH = "instance-reboot"
3053   HTYPE = constants.HTYPE_INSTANCE
3054   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3055   REQ_BGL = False
3056
3057   def ExpandNames(self):
3058     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3059                                    constants.INSTANCE_REBOOT_HARD,
3060                                    constants.INSTANCE_REBOOT_FULL]:
3061       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3062                                   (constants.INSTANCE_REBOOT_SOFT,
3063                                    constants.INSTANCE_REBOOT_HARD,
3064                                    constants.INSTANCE_REBOOT_FULL))
3065     self._ExpandAndLockInstance()
3066
3067   def BuildHooksEnv(self):
3068     """Build hooks env.
3069
3070     This runs on master, primary and secondary nodes of the instance.
3071
3072     """
3073     env = {
3074       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3075       "REBOOT_TYPE": self.op.reboot_type,
3076       }
3077     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3078     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3079     return env, nl, nl
3080
3081   def CheckPrereq(self):
3082     """Check prerequisites.
3083
3084     This checks that the instance is in the cluster.
3085
3086     """
3087     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3088     assert self.instance is not None, \
3089       "Cannot retrieve locked instance %s" % self.op.instance_name
3090
3091     _CheckNodeOnline(self, instance.primary_node)
3092
3093     # check bridges existence
3094     _CheckInstanceBridgesExist(self, instance)
3095
3096   def Exec(self, feedback_fn):
3097     """Reboot the instance.
3098
3099     """
3100     instance = self.instance
3101     ignore_secondaries = self.op.ignore_secondaries
3102     reboot_type = self.op.reboot_type
3103
3104     node_current = instance.primary_node
3105
3106     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3107                        constants.INSTANCE_REBOOT_HARD]:
3108       for disk in instance.disks:
3109         self.cfg.SetDiskID(disk, node_current)
3110       result = self.rpc.call_instance_reboot(node_current, instance,
3111                                              reboot_type)
3112       msg = result.RemoteFailMsg()
3113       if msg:
3114         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3115     else:
3116       result = self.rpc.call_instance_shutdown(node_current, instance)
3117       msg = result.RemoteFailMsg()
3118       if msg:
3119         raise errors.OpExecError("Could not shutdown instance for"
3120                                  " full reboot: %s" % msg)
3121       _ShutdownInstanceDisks(self, instance)
3122       _StartInstanceDisks(self, instance, ignore_secondaries)
3123       result = self.rpc.call_instance_start(node_current, instance, None, None)
3124       msg = result.RemoteFailMsg()
3125       if msg:
3126         _ShutdownInstanceDisks(self, instance)
3127         raise errors.OpExecError("Could not start instance for"
3128                                  " full reboot: %s" % msg)
3129
3130     self.cfg.MarkInstanceUp(instance.name)
3131
3132
3133 class LUShutdownInstance(LogicalUnit):
3134   """Shutdown an instance.
3135
3136   """
3137   HPATH = "instance-stop"
3138   HTYPE = constants.HTYPE_INSTANCE
3139   _OP_REQP = ["instance_name"]
3140   REQ_BGL = False
3141
3142   def ExpandNames(self):
3143     self._ExpandAndLockInstance()
3144
3145   def BuildHooksEnv(self):
3146     """Build hooks env.
3147
3148     This runs on master, primary and secondary nodes of the instance.
3149
3150     """
3151     env = _BuildInstanceHookEnvByObject(self, self.instance)
3152     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3153     return env, nl, nl
3154
3155   def CheckPrereq(self):
3156     """Check prerequisites.
3157
3158     This checks that the instance is in the cluster.
3159
3160     """
3161     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3162     assert self.instance is not None, \
3163       "Cannot retrieve locked instance %s" % self.op.instance_name
3164     _CheckNodeOnline(self, self.instance.primary_node)
3165
3166   def Exec(self, feedback_fn):
3167     """Shutdown the instance.
3168
3169     """
3170     instance = self.instance
3171     node_current = instance.primary_node
3172     self.cfg.MarkInstanceDown(instance.name)
3173     result = self.rpc.call_instance_shutdown(node_current, instance)
3174     msg = result.RemoteFailMsg()
3175     if msg:
3176       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3177
3178     _ShutdownInstanceDisks(self, instance)
3179
3180
3181 class LUReinstallInstance(LogicalUnit):
3182   """Reinstall an instance.
3183
3184   """
3185   HPATH = "instance-reinstall"
3186   HTYPE = constants.HTYPE_INSTANCE
3187   _OP_REQP = ["instance_name"]
3188   REQ_BGL = False
3189
3190   def ExpandNames(self):
3191     self._ExpandAndLockInstance()
3192
3193   def BuildHooksEnv(self):
3194     """Build hooks env.
3195
3196     This runs on master, primary and secondary nodes of the instance.
3197
3198     """
3199     env = _BuildInstanceHookEnvByObject(self, self.instance)
3200     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3201     return env, nl, nl
3202
3203   def CheckPrereq(self):
3204     """Check prerequisites.
3205
3206     This checks that the instance is in the cluster and is not running.
3207
3208     """
3209     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3210     assert instance is not None, \
3211       "Cannot retrieve locked instance %s" % self.op.instance_name
3212     _CheckNodeOnline(self, instance.primary_node)
3213
3214     if instance.disk_template == constants.DT_DISKLESS:
3215       raise errors.OpPrereqError("Instance '%s' has no disks" %
3216                                  self.op.instance_name)
3217     if instance.admin_up:
3218       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3219                                  self.op.instance_name)
3220     remote_info = self.rpc.call_instance_info(instance.primary_node,
3221                                               instance.name,
3222                                               instance.hypervisor)
3223     remote_info.Raise()
3224     if remote_info.data:
3225       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3226                                  (self.op.instance_name,
3227                                   instance.primary_node))
3228
3229     self.op.os_type = getattr(self.op, "os_type", None)
3230     if self.op.os_type is not None:
3231       # OS verification
3232       pnode = self.cfg.GetNodeInfo(
3233         self.cfg.ExpandNodeName(instance.primary_node))
3234       if pnode is None:
3235         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3236                                    self.op.pnode)
3237       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3238       result.Raise()
3239       if not isinstance(result.data, objects.OS):
3240         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3241                                    " primary node"  % self.op.os_type)
3242
3243     self.instance = instance
3244
3245   def Exec(self, feedback_fn):
3246     """Reinstall the instance.
3247
3248     """
3249     inst = self.instance
3250
3251     if self.op.os_type is not None:
3252       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3253       inst.os = self.op.os_type
3254       self.cfg.Update(inst)
3255
3256     _StartInstanceDisks(self, inst, None)
3257     try:
3258       feedback_fn("Running the instance OS create scripts...")
3259       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3260       msg = result.RemoteFailMsg()
3261       if msg:
3262         raise errors.OpExecError("Could not install OS for instance %s"
3263                                  " on node %s: %s" %
3264                                  (inst.name, inst.primary_node, msg))
3265     finally:
3266       _ShutdownInstanceDisks(self, inst)
3267
3268
3269 class LURenameInstance(LogicalUnit):
3270   """Rename an instance.
3271
3272   """
3273   HPATH = "instance-rename"
3274   HTYPE = constants.HTYPE_INSTANCE
3275   _OP_REQP = ["instance_name", "new_name"]
3276
3277   def BuildHooksEnv(self):
3278     """Build hooks env.
3279
3280     This runs on master, primary and secondary nodes of the instance.
3281
3282     """
3283     env = _BuildInstanceHookEnvByObject(self, self.instance)
3284     env["INSTANCE_NEW_NAME"] = self.op.new_name
3285     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3286     return env, nl, nl
3287
3288   def CheckPrereq(self):
3289     """Check prerequisites.
3290
3291     This checks that the instance is in the cluster and is not running.
3292
3293     """
3294     instance = self.cfg.GetInstanceInfo(
3295       self.cfg.ExpandInstanceName(self.op.instance_name))
3296     if instance is None:
3297       raise errors.OpPrereqError("Instance '%s' not known" %
3298                                  self.op.instance_name)
3299     _CheckNodeOnline(self, instance.primary_node)
3300
3301     if instance.admin_up:
3302       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3303                                  self.op.instance_name)
3304     remote_info = self.rpc.call_instance_info(instance.primary_node,
3305                                               instance.name,
3306                                               instance.hypervisor)
3307     remote_info.Raise()
3308     if remote_info.data:
3309       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3310                                  (self.op.instance_name,
3311                                   instance.primary_node))
3312     self.instance = instance
3313
3314     # new name verification
3315     name_info = utils.HostInfo(self.op.new_name)
3316
3317     self.op.new_name = new_name = name_info.name
3318     instance_list = self.cfg.GetInstanceList()
3319     if new_name in instance_list:
3320       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3321                                  new_name)
3322
3323     if not getattr(self.op, "ignore_ip", False):
3324       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3325         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3326                                    (name_info.ip, new_name))
3327
3328
3329   def Exec(self, feedback_fn):
3330     """Reinstall the instance.
3331
3332     """
3333     inst = self.instance
3334     old_name = inst.name
3335
3336     if inst.disk_template == constants.DT_FILE:
3337       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3338
3339     self.cfg.RenameInstance(inst.name, self.op.new_name)
3340     # Change the instance lock. This is definitely safe while we hold the BGL
3341     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3342     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3343
3344     # re-read the instance from the configuration after rename
3345     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3346
3347     if inst.disk_template == constants.DT_FILE:
3348       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3349       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3350                                                      old_file_storage_dir,
3351                                                      new_file_storage_dir)
3352       result.Raise()
3353       if not result.data:
3354         raise errors.OpExecError("Could not connect to node '%s' to rename"
3355                                  " directory '%s' to '%s' (but the instance"
3356                                  " has been renamed in Ganeti)" % (
3357                                  inst.primary_node, old_file_storage_dir,
3358                                  new_file_storage_dir))
3359
3360       if not result.data[0]:
3361         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3362                                  " (but the instance has been renamed in"
3363                                  " Ganeti)" % (old_file_storage_dir,
3364                                                new_file_storage_dir))
3365
3366     _StartInstanceDisks(self, inst, None)
3367     try:
3368       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3369                                                  old_name)
3370       msg = result.RemoteFailMsg()
3371       if msg:
3372         msg = ("Could not run OS rename script for instance %s on node %s"
3373                " (but the instance has been renamed in Ganeti): %s" %
3374                (inst.name, inst.primary_node, msg))
3375         self.proc.LogWarning(msg)
3376     finally:
3377       _ShutdownInstanceDisks(self, inst)
3378
3379
3380 class LURemoveInstance(LogicalUnit):
3381   """Remove an instance.
3382
3383   """
3384   HPATH = "instance-remove"
3385   HTYPE = constants.HTYPE_INSTANCE
3386   _OP_REQP = ["instance_name", "ignore_failures"]
3387   REQ_BGL = False
3388
3389   def ExpandNames(self):
3390     self._ExpandAndLockInstance()
3391     self.needed_locks[locking.LEVEL_NODE] = []
3392     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3393
3394   def DeclareLocks(self, level):
3395     if level == locking.LEVEL_NODE:
3396       self._LockInstancesNodes()
3397
3398   def BuildHooksEnv(self):
3399     """Build hooks env.
3400
3401     This runs on master, primary and secondary nodes of the instance.
3402
3403     """
3404     env = _BuildInstanceHookEnvByObject(self, self.instance)
3405     nl = [self.cfg.GetMasterNode()]
3406     return env, nl, nl
3407
3408   def CheckPrereq(self):
3409     """Check prerequisites.
3410
3411     This checks that the instance is in the cluster.
3412
3413     """
3414     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3415     assert self.instance is not None, \
3416       "Cannot retrieve locked instance %s" % self.op.instance_name
3417
3418   def Exec(self, feedback_fn):
3419     """Remove the instance.
3420
3421     """
3422     instance = self.instance
3423     logging.info("Shutting down instance %s on node %s",
3424                  instance.name, instance.primary_node)
3425
3426     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3427     msg = result.RemoteFailMsg()
3428     if msg:
3429       if self.op.ignore_failures:
3430         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3431       else:
3432         raise errors.OpExecError("Could not shutdown instance %s on"
3433                                  " node %s: %s" %
3434                                  (instance.name, instance.primary_node, msg))
3435
3436     logging.info("Removing block devices for instance %s", instance.name)
3437
3438     if not _RemoveDisks(self, instance):
3439       if self.op.ignore_failures:
3440         feedback_fn("Warning: can't remove instance's disks")
3441       else:
3442         raise errors.OpExecError("Can't remove instance's disks")
3443
3444     logging.info("Removing instance %s out of cluster config", instance.name)
3445
3446     self.cfg.RemoveInstance(instance.name)
3447     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3448
3449
3450 class LUQueryInstances(NoHooksLU):
3451   """Logical unit for querying instances.
3452
3453   """
3454   _OP_REQP = ["output_fields", "names", "use_locking"]
3455   REQ_BGL = False
3456   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3457                                     "admin_state",
3458                                     "disk_template", "ip", "mac", "bridge",
3459                                     "sda_size", "sdb_size", "vcpus", "tags",
3460                                     "network_port", "beparams",
3461                                     r"(disk)\.(size)/([0-9]+)",
3462                                     r"(disk)\.(sizes)", "disk_usage",
3463                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3464                                     r"(nic)\.(macs|ips|bridges)",
3465                                     r"(disk|nic)\.(count)",
3466                                     "serial_no", "hypervisor", "hvparams",] +
3467                                   ["hv/%s" % name
3468                                    for name in constants.HVS_PARAMETERS] +
3469                                   ["be/%s" % name
3470                                    for name in constants.BES_PARAMETERS])
3471   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3472
3473
3474   def ExpandNames(self):
3475     _CheckOutputFields(static=self._FIELDS_STATIC,
3476                        dynamic=self._FIELDS_DYNAMIC,
3477                        selected=self.op.output_fields)
3478
3479     self.needed_locks = {}
3480     self.share_locks[locking.LEVEL_INSTANCE] = 1
3481     self.share_locks[locking.LEVEL_NODE] = 1
3482
3483     if self.op.names:
3484       self.wanted = _GetWantedInstances(self, self.op.names)
3485     else:
3486       self.wanted = locking.ALL_SET
3487
3488     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3489     self.do_locking = self.do_node_query and self.op.use_locking
3490     if self.do_locking:
3491       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3492       self.needed_locks[locking.LEVEL_NODE] = []
3493       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3494
3495   def DeclareLocks(self, level):
3496     if level == locking.LEVEL_NODE and self.do_locking:
3497       self._LockInstancesNodes()
3498
3499   def CheckPrereq(self):
3500     """Check prerequisites.
3501
3502     """
3503     pass
3504
3505   def Exec(self, feedback_fn):
3506     """Computes the list of nodes and their attributes.
3507
3508     """
3509     all_info = self.cfg.GetAllInstancesInfo()
3510     if self.wanted == locking.ALL_SET:
3511       # caller didn't specify instance names, so ordering is not important
3512       if self.do_locking:
3513         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3514       else:
3515         instance_names = all_info.keys()
3516       instance_names = utils.NiceSort(instance_names)
3517     else:
3518       # caller did specify names, so we must keep the ordering
3519       if self.do_locking:
3520         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3521       else:
3522         tgt_set = all_info.keys()
3523       missing = set(self.wanted).difference(tgt_set)
3524       if missing:
3525         raise errors.OpExecError("Some instances were removed before"
3526                                  " retrieving their data: %s" % missing)
3527       instance_names = self.wanted
3528
3529     instance_list = [all_info[iname] for iname in instance_names]
3530
3531     # begin data gathering
3532
3533     nodes = frozenset([inst.primary_node for inst in instance_list])
3534     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3535
3536     bad_nodes = []
3537     off_nodes = []
3538     if self.do_node_query:
3539       live_data = {}
3540       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3541       for name in nodes:
3542         result = node_data[name]
3543         if result.offline:
3544           # offline nodes will be in both lists
3545           off_nodes.append(name)
3546         if result.failed:
3547           bad_nodes.append(name)
3548         else:
3549           if result.data:
3550             live_data.update(result.data)
3551             # else no instance is alive
3552     else:
3553       live_data = dict([(name, {}) for name in instance_names])
3554
3555     # end data gathering
3556
3557     HVPREFIX = "hv/"
3558     BEPREFIX = "be/"
3559     output = []
3560     for instance in instance_list:
3561       iout = []
3562       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3563       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3564       for field in self.op.output_fields:
3565         st_match = self._FIELDS_STATIC.Matches(field)
3566         if field == "name":
3567           val = instance.name
3568         elif field == "os":
3569           val = instance.os
3570         elif field == "pnode":
3571           val = instance.primary_node
3572         elif field == "snodes":
3573           val = list(instance.secondary_nodes)
3574         elif field == "admin_state":
3575           val = instance.admin_up
3576         elif field == "oper_state":
3577           if instance.primary_node in bad_nodes:
3578             val = None
3579           else:
3580             val = bool(live_data.get(instance.name))
3581         elif field == "status":
3582           if instance.primary_node in off_nodes:
3583             val = "ERROR_nodeoffline"
3584           elif instance.primary_node in bad_nodes:
3585             val = "ERROR_nodedown"
3586           else:
3587             running = bool(live_data.get(instance.name))
3588             if running:
3589               if instance.admin_up:
3590                 val = "running"
3591               else:
3592                 val = "ERROR_up"
3593             else:
3594               if instance.admin_up:
3595                 val = "ERROR_down"
3596               else:
3597                 val = "ADMIN_down"
3598         elif field == "oper_ram":
3599           if instance.primary_node in bad_nodes:
3600             val = None
3601           elif instance.name in live_data:
3602             val = live_data[instance.name].get("memory", "?")
3603           else:
3604             val = "-"
3605         elif field == "vcpus":
3606           val = i_be[constants.BE_VCPUS]
3607         elif field == "disk_template":
3608           val = instance.disk_template
3609         elif field == "ip":
3610           if instance.nics:
3611             val = instance.nics[0].ip
3612           else:
3613             val = None
3614         elif field == "bridge":
3615           if instance.nics:
3616             val = instance.nics[0].bridge
3617           else:
3618             val = None
3619         elif field == "mac":
3620           if instance.nics:
3621             val = instance.nics[0].mac
3622           else:
3623             val = None
3624         elif field == "sda_size" or field == "sdb_size":
3625           idx = ord(field[2]) - ord('a')
3626           try:
3627             val = instance.FindDisk(idx).size
3628           except errors.OpPrereqError:
3629             val = None
3630         elif field == "disk_usage": # total disk usage per node
3631           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3632           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3633         elif field == "tags":
3634           val = list(instance.GetTags())
3635         elif field == "serial_no":
3636           val = instance.serial_no
3637         elif field == "network_port":
3638           val = instance.network_port
3639         elif field == "hypervisor":
3640           val = instance.hypervisor
3641         elif field == "hvparams":
3642           val = i_hv
3643         elif (field.startswith(HVPREFIX) and
3644               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3645           val = i_hv.get(field[len(HVPREFIX):], None)
3646         elif field == "beparams":
3647           val = i_be
3648         elif (field.startswith(BEPREFIX) and
3649               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3650           val = i_be.get(field[len(BEPREFIX):], None)
3651         elif st_match and st_match.groups():
3652           # matches a variable list
3653           st_groups = st_match.groups()
3654           if st_groups and st_groups[0] == "disk":
3655             if st_groups[1] == "count":
3656               val = len(instance.disks)
3657             elif st_groups[1] == "sizes":
3658               val = [disk.size for disk in instance.disks]
3659             elif st_groups[1] == "size":
3660               try:
3661                 val = instance.FindDisk(st_groups[2]).size
3662               except errors.OpPrereqError:
3663                 val = None
3664             else:
3665               assert False, "Unhandled disk parameter"
3666           elif st_groups[0] == "nic":
3667             if st_groups[1] == "count":
3668               val = len(instance.nics)
3669             elif st_groups[1] == "macs":
3670               val = [nic.mac for nic in instance.nics]
3671             elif st_groups[1] == "ips":
3672               val = [nic.ip for nic in instance.nics]
3673             elif st_groups[1] == "bridges":
3674               val = [nic.bridge for nic in instance.nics]
3675             else:
3676               # index-based item
3677               nic_idx = int(st_groups[2])
3678               if nic_idx >= len(instance.nics):
3679                 val = None
3680               else:
3681                 if st_groups[1] == "mac":
3682                   val = instance.nics[nic_idx].mac
3683                 elif st_groups[1] == "ip":
3684                   val = instance.nics[nic_idx].ip
3685                 elif st_groups[1] == "bridge":
3686                   val = instance.nics[nic_idx].bridge
3687                 else:
3688                   assert False, "Unhandled NIC parameter"
3689           else:
3690             assert False, ("Declared but unhandled variable parameter '%s'" %
3691                            field)
3692         else:
3693           assert False, "Declared but unhandled parameter '%s'" % field
3694         iout.append(val)
3695       output.append(iout)
3696
3697     return output
3698
3699
3700 class LUFailoverInstance(LogicalUnit):
3701   """Failover an instance.
3702
3703   """
3704   HPATH = "instance-failover"
3705   HTYPE = constants.HTYPE_INSTANCE
3706   _OP_REQP = ["instance_name", "ignore_consistency"]
3707   REQ_BGL = False
3708
3709   def ExpandNames(self):
3710     self._ExpandAndLockInstance()
3711     self.needed_locks[locking.LEVEL_NODE] = []
3712     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3713
3714   def DeclareLocks(self, level):
3715     if level == locking.LEVEL_NODE:
3716       self._LockInstancesNodes()
3717
3718   def BuildHooksEnv(self):
3719     """Build hooks env.
3720
3721     This runs on master, primary and secondary nodes of the instance.
3722
3723     """
3724     env = {
3725       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3726       }
3727     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3728     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3729     return env, nl, nl
3730
3731   def CheckPrereq(self):
3732     """Check prerequisites.
3733
3734     This checks that the instance is in the cluster.
3735
3736     """
3737     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3738     assert self.instance is not None, \
3739       "Cannot retrieve locked instance %s" % self.op.instance_name
3740
3741     bep = self.cfg.GetClusterInfo().FillBE(instance)
3742     if instance.disk_template not in constants.DTS_NET_MIRROR:
3743       raise errors.OpPrereqError("Instance's disk layout is not"
3744                                  " network mirrored, cannot failover.")
3745
3746     secondary_nodes = instance.secondary_nodes
3747     if not secondary_nodes:
3748       raise errors.ProgrammerError("no secondary node but using "
3749                                    "a mirrored disk template")
3750
3751     target_node = secondary_nodes[0]
3752     _CheckNodeOnline(self, target_node)
3753     _CheckNodeNotDrained(self, target_node)
3754
3755     if instance.admin_up:
3756       # check memory requirements on the secondary node
3757       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3758                            instance.name, bep[constants.BE_MEMORY],
3759                            instance.hypervisor)
3760     else:
3761       self.LogInfo("Not checking memory on the secondary node as"
3762                    " instance will not be started")
3763
3764     # check bridge existence
3765     brlist = [nic.bridge for nic in instance.nics]
3766     result = self.rpc.call_bridges_exist(target_node, brlist)
3767     result.Raise()
3768     if not result.data:
3769       raise errors.OpPrereqError("One or more target bridges %s does not"
3770                                  " exist on destination node '%s'" %
3771                                  (brlist, target_node))
3772
3773   def Exec(self, feedback_fn):
3774     """Failover an instance.
3775
3776     The failover is done by shutting it down on its present node and
3777     starting it on the secondary.
3778
3779     """
3780     instance = self.instance
3781
3782     source_node = instance.primary_node
3783     target_node = instance.secondary_nodes[0]
3784
3785     feedback_fn("* checking disk consistency between source and target")
3786     for dev in instance.disks:
3787       # for drbd, these are drbd over lvm
3788       if not _CheckDiskConsistency(self, dev, target_node, False):
3789         if instance.admin_up and not self.op.ignore_consistency:
3790           raise errors.OpExecError("Disk %s is degraded on target node,"
3791                                    " aborting failover." % dev.iv_name)
3792
3793     feedback_fn("* shutting down instance on source node")
3794     logging.info("Shutting down instance %s on node %s",
3795                  instance.name, source_node)
3796
3797     result = self.rpc.call_instance_shutdown(source_node, instance)
3798     msg = result.RemoteFailMsg()
3799     if msg:
3800       if self.op.ignore_consistency:
3801         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3802                              " Proceeding anyway. Please make sure node"
3803                              " %s is down. Error details: %s",
3804                              instance.name, source_node, source_node, msg)
3805       else:
3806         raise errors.OpExecError("Could not shutdown instance %s on"
3807                                  " node %s: %s" %
3808                                  (instance.name, source_node, msg))
3809
3810     feedback_fn("* deactivating the instance's disks on source node")
3811     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3812       raise errors.OpExecError("Can't shut down the instance's disks.")
3813
3814     instance.primary_node = target_node
3815     # distribute new instance config to the other nodes
3816     self.cfg.Update(instance)
3817
3818     # Only start the instance if it's marked as up
3819     if instance.admin_up:
3820       feedback_fn("* activating the instance's disks on target node")
3821       logging.info("Starting instance %s on node %s",
3822                    instance.name, target_node)
3823
3824       disks_ok, _ = _AssembleInstanceDisks(self, instance,
3825                                                ignore_secondaries=True)
3826       if not disks_ok:
3827         _ShutdownInstanceDisks(self, instance)
3828         raise errors.OpExecError("Can't activate the instance's disks")
3829
3830       feedback_fn("* starting the instance on the target node")
3831       result = self.rpc.call_instance_start(target_node, instance, None, None)
3832       msg = result.RemoteFailMsg()
3833       if msg:
3834         _ShutdownInstanceDisks(self, instance)
3835         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3836                                  (instance.name, target_node, msg))
3837
3838
3839 class LUMigrateInstance(LogicalUnit):
3840   """Migrate an instance.
3841
3842   This is migration without shutting down, compared to the failover,
3843   which is done with shutdown.
3844
3845   """
3846   HPATH = "instance-migrate"
3847   HTYPE = constants.HTYPE_INSTANCE
3848   _OP_REQP = ["instance_name", "live", "cleanup"]
3849
3850   REQ_BGL = False
3851
3852   def ExpandNames(self):
3853     self._ExpandAndLockInstance()
3854     self.needed_locks[locking.LEVEL_NODE] = []
3855     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3856
3857   def DeclareLocks(self, level):
3858     if level == locking.LEVEL_NODE:
3859       self._LockInstancesNodes()
3860
3861   def BuildHooksEnv(self):
3862     """Build hooks env.
3863
3864     This runs on master, primary and secondary nodes of the instance.
3865
3866     """
3867     env = _BuildInstanceHookEnvByObject(self, self.instance)
3868     env["MIGRATE_LIVE"] = self.op.live
3869     env["MIGRATE_CLEANUP"] = self.op.cleanup
3870     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3871     return env, nl, nl
3872
3873   def CheckPrereq(self):
3874     """Check prerequisites.
3875
3876     This checks that the instance is in the cluster.
3877
3878     """
3879     instance = self.cfg.GetInstanceInfo(
3880       self.cfg.ExpandInstanceName(self.op.instance_name))
3881     if instance is None:
3882       raise errors.OpPrereqError("Instance '%s' not known" %
3883                                  self.op.instance_name)
3884
3885     if instance.disk_template != constants.DT_DRBD8:
3886       raise errors.OpPrereqError("Instance's disk layout is not"
3887                                  " drbd8, cannot migrate.")
3888
3889     secondary_nodes = instance.secondary_nodes
3890     if not secondary_nodes:
3891       raise errors.ConfigurationError("No secondary node but using"
3892                                       " drbd8 disk template")
3893
3894     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3895
3896     target_node = secondary_nodes[0]
3897     # check memory requirements on the secondary node
3898     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3899                          instance.name, i_be[constants.BE_MEMORY],
3900                          instance.hypervisor)
3901
3902     # check bridge existence
3903     brlist = [nic.bridge for nic in instance.nics]
3904     result = self.rpc.call_bridges_exist(target_node, brlist)
3905     if result.failed or not result.data:
3906       raise errors.OpPrereqError("One or more target bridges %s does not"
3907                                  " exist on destination node '%s'" %
3908                                  (brlist, target_node))
3909
3910     if not self.op.cleanup:
3911       _CheckNodeNotDrained(self, target_node)
3912       result = self.rpc.call_instance_migratable(instance.primary_node,
3913                                                  instance)
3914       msg = result.RemoteFailMsg()
3915       if msg:
3916         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3917                                    msg)
3918
3919     self.instance = instance
3920
3921   def _WaitUntilSync(self):
3922     """Poll with custom rpc for disk sync.
3923
3924     This uses our own step-based rpc call.
3925
3926     """
3927     self.feedback_fn("* wait until resync is done")
3928     all_done = False
3929     while not all_done:
3930       all_done = True
3931       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3932                                             self.nodes_ip,
3933                                             self.instance.disks)
3934       min_percent = 100
3935       for node, nres in result.items():
3936         msg = nres.RemoteFailMsg()
3937         if msg:
3938           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3939                                    (node, msg))
3940         node_done, node_percent = nres.payload
3941         all_done = all_done and node_done
3942         if node_percent is not None:
3943           min_percent = min(min_percent, node_percent)
3944       if not all_done:
3945         if min_percent < 100:
3946           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3947         time.sleep(2)
3948
3949   def _EnsureSecondary(self, node):
3950     """Demote a node to secondary.
3951
3952     """
3953     self.feedback_fn("* switching node %s to secondary mode" % node)
3954
3955     for dev in self.instance.disks:
3956       self.cfg.SetDiskID(dev, node)
3957
3958     result = self.rpc.call_blockdev_close(node, self.instance.name,
3959                                           self.instance.disks)
3960     msg = result.RemoteFailMsg()
3961     if msg:
3962       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3963                                " error %s" % (node, msg))
3964
3965   def _GoStandalone(self):
3966     """Disconnect from the network.
3967
3968     """
3969     self.feedback_fn("* changing into standalone mode")
3970     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3971                                                self.instance.disks)
3972     for node, nres in result.items():
3973       msg = nres.RemoteFailMsg()
3974       if msg:
3975         raise errors.OpExecError("Cannot disconnect disks node %s,"
3976                                  " error %s" % (node, msg))
3977
3978   def _GoReconnect(self, multimaster):
3979     """Reconnect to the network.
3980
3981     """
3982     if multimaster:
3983       msg = "dual-master"
3984     else:
3985       msg = "single-master"
3986     self.feedback_fn("* changing disks into %s mode" % msg)
3987     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3988                                            self.instance.disks,
3989                                            self.instance.name, multimaster)
3990     for node, nres in result.items():
3991       msg = nres.RemoteFailMsg()
3992       if msg:
3993         raise errors.OpExecError("Cannot change disks config on node %s,"
3994                                  " error: %s" % (node, msg))
3995
3996   def _ExecCleanup(self):
3997     """Try to cleanup after a failed migration.
3998
3999     The cleanup is done by:
4000       - check that the instance is running only on one node
4001         (and update the config if needed)
4002       - change disks on its secondary node to secondary
4003       - wait until disks are fully synchronized
4004       - disconnect from the network
4005       - change disks into single-master mode
4006       - wait again until disks are fully synchronized
4007
4008     """
4009     instance = self.instance
4010     target_node = self.target_node
4011     source_node = self.source_node
4012
4013     # check running on only one node
4014     self.feedback_fn("* checking where the instance actually runs"
4015                      " (if this hangs, the hypervisor might be in"
4016                      " a bad state)")
4017     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4018     for node, result in ins_l.items():
4019       result.Raise()
4020       if not isinstance(result.data, list):
4021         raise errors.OpExecError("Can't contact node '%s'" % node)
4022
4023     runningon_source = instance.name in ins_l[source_node].data
4024     runningon_target = instance.name in ins_l[target_node].data
4025
4026     if runningon_source and runningon_target:
4027       raise errors.OpExecError("Instance seems to be running on two nodes,"
4028                                " or the hypervisor is confused. You will have"
4029                                " to ensure manually that it runs only on one"
4030                                " and restart this operation.")
4031
4032     if not (runningon_source or runningon_target):
4033       raise errors.OpExecError("Instance does not seem to be running at all."
4034                                " In this case, it's safer to repair by"
4035                                " running 'gnt-instance stop' to ensure disk"
4036                                " shutdown, and then restarting it.")
4037
4038     if runningon_target:
4039       # the migration has actually succeeded, we need to update the config
4040       self.feedback_fn("* instance running on secondary node (%s),"
4041                        " updating config" % target_node)
4042       instance.primary_node = target_node
4043       self.cfg.Update(instance)
4044       demoted_node = source_node
4045     else:
4046       self.feedback_fn("* instance confirmed to be running on its"
4047                        " primary node (%s)" % source_node)
4048       demoted_node = target_node
4049
4050     self._EnsureSecondary(demoted_node)
4051     try:
4052       self._WaitUntilSync()
4053     except errors.OpExecError:
4054       # we ignore here errors, since if the device is standalone, it
4055       # won't be able to sync
4056       pass
4057     self._GoStandalone()
4058     self._GoReconnect(False)
4059     self._WaitUntilSync()
4060
4061     self.feedback_fn("* done")
4062
4063   def _RevertDiskStatus(self):
4064     """Try to revert the disk status after a failed migration.
4065
4066     """
4067     target_node = self.target_node
4068     try:
4069       self._EnsureSecondary(target_node)
4070       self._GoStandalone()
4071       self._GoReconnect(False)
4072       self._WaitUntilSync()
4073     except errors.OpExecError, err:
4074       self.LogWarning("Migration failed and I can't reconnect the"
4075                       " drives: error '%s'\n"
4076                       "Please look and recover the instance status" %
4077                       str(err))
4078
4079   def _AbortMigration(self):
4080     """Call the hypervisor code to abort a started migration.
4081
4082     """
4083     instance = self.instance
4084     target_node = self.target_node
4085     migration_info = self.migration_info
4086
4087     abort_result = self.rpc.call_finalize_migration(target_node,
4088                                                     instance,
4089                                                     migration_info,
4090                                                     False)
4091     abort_msg = abort_result.RemoteFailMsg()
4092     if abort_msg:
4093       logging.error("Aborting migration failed on target node %s: %s" %
4094                     (target_node, abort_msg))
4095       # Don't raise an exception here, as we stil have to try to revert the
4096       # disk status, even if this step failed.
4097
4098   def _ExecMigration(self):
4099     """Migrate an instance.
4100
4101     The migrate is done by:
4102       - change the disks into dual-master mode
4103       - wait until disks are fully synchronized again
4104       - migrate the instance
4105       - change disks on the new secondary node (the old primary) to secondary
4106       - wait until disks are fully synchronized
4107       - change disks into single-master mode
4108
4109     """
4110     instance = self.instance
4111     target_node = self.target_node
4112     source_node = self.source_node
4113
4114     self.feedback_fn("* checking disk consistency between source and target")
4115     for dev in instance.disks:
4116       if not _CheckDiskConsistency(self, dev, target_node, False):
4117         raise errors.OpExecError("Disk %s is degraded or not fully"
4118                                  " synchronized on target node,"
4119                                  " aborting migrate." % dev.iv_name)
4120
4121     # First get the migration information from the remote node
4122     result = self.rpc.call_migration_info(source_node, instance)
4123     msg = result.RemoteFailMsg()
4124     if msg:
4125       log_err = ("Failed fetching source migration information from %s: %s" %
4126                  (source_node, msg))
4127       logging.error(log_err)
4128       raise errors.OpExecError(log_err)
4129
4130     self.migration_info = migration_info = result.payload
4131
4132     # Then switch the disks to master/master mode
4133     self._EnsureSecondary(target_node)
4134     self._GoStandalone()
4135     self._GoReconnect(True)
4136     self._WaitUntilSync()
4137
4138     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4139     result = self.rpc.call_accept_instance(target_node,
4140                                            instance,
4141                                            migration_info,
4142                                            self.nodes_ip[target_node])
4143
4144     msg = result.RemoteFailMsg()
4145     if msg:
4146       logging.error("Instance pre-migration failed, trying to revert"
4147                     " disk status: %s", msg)
4148       self._AbortMigration()
4149       self._RevertDiskStatus()
4150       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4151                                (instance.name, msg))
4152
4153     self.feedback_fn("* migrating instance to %s" % target_node)
4154     time.sleep(10)
4155     result = self.rpc.call_instance_migrate(source_node, instance,
4156                                             self.nodes_ip[target_node],
4157                                             self.op.live)
4158     msg = result.RemoteFailMsg()
4159     if msg:
4160       logging.error("Instance migration failed, trying to revert"
4161                     " disk status: %s", msg)
4162       self._AbortMigration()
4163       self._RevertDiskStatus()
4164       raise errors.OpExecError("Could not migrate instance %s: %s" %
4165                                (instance.name, msg))
4166     time.sleep(10)
4167
4168     instance.primary_node = target_node
4169     # distribute new instance config to the other nodes
4170     self.cfg.Update(instance)
4171
4172     result = self.rpc.call_finalize_migration(target_node,
4173                                               instance,
4174                                               migration_info,
4175                                               True)
4176     msg = result.RemoteFailMsg()
4177     if msg:
4178       logging.error("Instance migration succeeded, but finalization failed:"
4179                     " %s" % msg)
4180       raise errors.OpExecError("Could not finalize instance migration: %s" %
4181                                msg)
4182
4183     self._EnsureSecondary(source_node)
4184     self._WaitUntilSync()
4185     self._GoStandalone()
4186     self._GoReconnect(False)
4187     self._WaitUntilSync()
4188
4189     self.feedback_fn("* done")
4190
4191   def Exec(self, feedback_fn):
4192     """Perform the migration.
4193
4194     """
4195     self.feedback_fn = feedback_fn
4196
4197     self.source_node = self.instance.primary_node
4198     self.target_node = self.instance.secondary_nodes[0]
4199     self.all_nodes = [self.source_node, self.target_node]
4200     self.nodes_ip = {
4201       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4202       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4203       }
4204     if self.op.cleanup:
4205       return self._ExecCleanup()
4206     else:
4207       return self._ExecMigration()
4208
4209
4210 def _CreateBlockDev(lu, node, instance, device, force_create,
4211                     info, force_open):
4212   """Create a tree of block devices on a given node.
4213
4214   If this device type has to be created on secondaries, create it and
4215   all its children.
4216
4217   If not, just recurse to children keeping the same 'force' value.
4218
4219   @param lu: the lu on whose behalf we execute
4220   @param node: the node on which to create the device
4221   @type instance: L{objects.Instance}
4222   @param instance: the instance which owns the device
4223   @type device: L{objects.Disk}
4224   @param device: the device to create
4225   @type force_create: boolean
4226   @param force_create: whether to force creation of this device; this
4227       will be change to True whenever we find a device which has
4228       CreateOnSecondary() attribute
4229   @param info: the extra 'metadata' we should attach to the device
4230       (this will be represented as a LVM tag)
4231   @type force_open: boolean
4232   @param force_open: this parameter will be passes to the
4233       L{backend.BlockdevCreate} function where it specifies
4234       whether we run on primary or not, and it affects both
4235       the child assembly and the device own Open() execution
4236
4237   """
4238   if device.CreateOnSecondary():
4239     force_create = True
4240
4241   if device.children:
4242     for child in device.children:
4243       _CreateBlockDev(lu, node, instance, child, force_create,
4244                       info, force_open)
4245
4246   if not force_create:
4247     return
4248
4249   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4250
4251
4252 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4253   """Create a single block device on a given node.
4254
4255   This will not recurse over children of the device, so they must be
4256   created in advance.
4257
4258   @param lu: the lu on whose behalf we execute
4259   @param node: the node on which to create the device
4260   @type instance: L{objects.Instance}
4261   @param instance: the instance which owns the device
4262   @type device: L{objects.Disk}
4263   @param device: the device to create
4264   @param info: the extra 'metadata' we should attach to the device
4265       (this will be represented as a LVM tag)
4266   @type force_open: boolean
4267   @param force_open: this parameter will be passes to the
4268       L{backend.BlockdevCreate} function where it specifies
4269       whether we run on primary or not, and it affects both
4270       the child assembly and the device own Open() execution
4271
4272   """
4273   lu.cfg.SetDiskID(device, node)
4274   result = lu.rpc.call_blockdev_create(node, device, device.size,
4275                                        instance.name, force_open, info)
4276   msg = result.RemoteFailMsg()
4277   if msg:
4278     raise errors.OpExecError("Can't create block device %s on"
4279                              " node %s for instance %s: %s" %
4280                              (device, node, instance.name, msg))
4281   if device.physical_id is None:
4282     device.physical_id = result.payload
4283
4284
4285 def _GenerateUniqueNames(lu, exts):
4286   """Generate a suitable LV name.
4287
4288   This will generate a logical volume name for the given instance.
4289
4290   """
4291   results = []
4292   for val in exts:
4293     new_id = lu.cfg.GenerateUniqueID()
4294     results.append("%s%s" % (new_id, val))
4295   return results
4296
4297
4298 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4299                          p_minor, s_minor):
4300   """Generate a drbd8 device complete with its children.
4301
4302   """
4303   port = lu.cfg.AllocatePort()
4304   vgname = lu.cfg.GetVGName()
4305   shared_secret = lu.cfg.GenerateDRBDSecret()
4306   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4307                           logical_id=(vgname, names[0]))
4308   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4309                           logical_id=(vgname, names[1]))
4310   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4311                           logical_id=(primary, secondary, port,
4312                                       p_minor, s_minor,
4313                                       shared_secret),
4314                           children=[dev_data, dev_meta],
4315                           iv_name=iv_name)
4316   return drbd_dev
4317
4318
4319 def _GenerateDiskTemplate(lu, template_name,
4320                           instance_name, primary_node,
4321                           secondary_nodes, disk_info,
4322                           file_storage_dir, file_driver,
4323                           base_index):
4324   """Generate the entire disk layout for a given template type.
4325
4326   """
4327   #TODO: compute space requirements
4328
4329   vgname = lu.cfg.GetVGName()
4330   disk_count = len(disk_info)
4331   disks = []
4332   if template_name == constants.DT_DISKLESS:
4333     pass
4334   elif template_name == constants.DT_PLAIN:
4335     if len(secondary_nodes) != 0:
4336       raise errors.ProgrammerError("Wrong template configuration")
4337
4338     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4339                                       for i in range(disk_count)])
4340     for idx, disk in enumerate(disk_info):
4341       disk_index = idx + base_index
4342       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4343                               logical_id=(vgname, names[idx]),
4344                               iv_name="disk/%d" % disk_index,
4345                               mode=disk["mode"])
4346       disks.append(disk_dev)
4347   elif template_name == constants.DT_DRBD8:
4348     if len(secondary_nodes) != 1:
4349       raise errors.ProgrammerError("Wrong template configuration")
4350     remote_node = secondary_nodes[0]
4351     minors = lu.cfg.AllocateDRBDMinor(
4352       [primary_node, remote_node] * len(disk_info), instance_name)
4353
4354     names = []
4355     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4356                                                for i in range(disk_count)]):
4357       names.append(lv_prefix + "_data")
4358       names.append(lv_prefix + "_meta")
4359     for idx, disk in enumerate(disk_info):
4360       disk_index = idx + base_index
4361       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4362                                       disk["size"], names[idx*2:idx*2+2],
4363                                       "disk/%d" % disk_index,
4364                                       minors[idx*2], minors[idx*2+1])
4365       disk_dev.mode = disk["mode"]
4366       disks.append(disk_dev)
4367   elif template_name == constants.DT_FILE:
4368     if len(secondary_nodes) != 0:
4369       raise errors.ProgrammerError("Wrong template configuration")
4370
4371     for idx, disk in enumerate(disk_info):
4372       disk_index = idx + base_index
4373       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4374                               iv_name="disk/%d" % disk_index,
4375                               logical_id=(file_driver,
4376                                           "%s/disk%d" % (file_storage_dir,
4377                                                          disk_index)),
4378                               mode=disk["mode"])
4379       disks.append(disk_dev)
4380   else:
4381     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4382   return disks
4383
4384
4385 def _GetInstanceInfoText(instance):
4386   """Compute that text that should be added to the disk's metadata.
4387
4388   """
4389   return "originstname+%s" % instance.name
4390
4391
4392 def _CreateDisks(lu, instance):
4393   """Create all disks for an instance.
4394
4395   This abstracts away some work from AddInstance.
4396
4397   @type lu: L{LogicalUnit}
4398   @param lu: the logical unit on whose behalf we execute
4399   @type instance: L{objects.Instance}
4400   @param instance: the instance whose disks we should create
4401   @rtype: boolean
4402   @return: the success of the creation
4403
4404   """
4405   info = _GetInstanceInfoText(instance)
4406   pnode = instance.primary_node
4407
4408   if instance.disk_template == constants.DT_FILE:
4409     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4410     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4411
4412     if result.failed or not result.data:
4413       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4414
4415     if not result.data[0]:
4416       raise errors.OpExecError("Failed to create directory '%s'" %
4417                                file_storage_dir)
4418
4419   # Note: this needs to be kept in sync with adding of disks in
4420   # LUSetInstanceParams
4421   for device in instance.disks:
4422     logging.info("Creating volume %s for instance %s",
4423                  device.iv_name, instance.name)
4424     #HARDCODE
4425     for node in instance.all_nodes:
4426       f_create = node == pnode
4427       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4428
4429
4430 def _RemoveDisks(lu, instance):
4431   """Remove all disks for an instance.
4432
4433   This abstracts away some work from `AddInstance()` and
4434   `RemoveInstance()`. Note that in case some of the devices couldn't
4435   be removed, the removal will continue with the other ones (compare
4436   with `_CreateDisks()`).
4437
4438   @type lu: L{LogicalUnit}
4439   @param lu: the logical unit on whose behalf we execute
4440   @type instance: L{objects.Instance}
4441   @param instance: the instance whose disks we should remove
4442   @rtype: boolean
4443   @return: the success of the removal
4444
4445   """
4446   logging.info("Removing block devices for instance %s", instance.name)
4447
4448   all_result = True
4449   for device in instance.disks:
4450     for node, disk in device.ComputeNodeTree(instance.primary_node):
4451       lu.cfg.SetDiskID(disk, node)
4452       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4453       if msg:
4454         lu.LogWarning("Could not remove block device %s on node %s,"
4455                       " continuing anyway: %s", device.iv_name, node, msg)
4456         all_result = False
4457
4458   if instance.disk_template == constants.DT_FILE:
4459     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4460     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4461                                                  file_storage_dir)
4462     if result.failed or not result.data:
4463       logging.error("Could not remove directory '%s'", file_storage_dir)
4464       all_result = False
4465
4466   return all_result
4467
4468
4469 def _ComputeDiskSize(disk_template, disks):
4470   """Compute disk size requirements in the volume group
4471
4472   """
4473   # Required free disk space as a function of disk and swap space
4474   req_size_dict = {
4475     constants.DT_DISKLESS: None,
4476     constants.DT_PLAIN: sum(d["size"] for d in disks),
4477     # 128 MB are added for drbd metadata for each disk
4478     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4479     constants.DT_FILE: None,
4480   }
4481
4482   if disk_template not in req_size_dict:
4483     raise errors.ProgrammerError("Disk template '%s' size requirement"
4484                                  " is unknown" %  disk_template)
4485
4486   return req_size_dict[disk_template]
4487
4488
4489 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4490   """Hypervisor parameter validation.
4491
4492   This function abstract the hypervisor parameter validation to be
4493   used in both instance create and instance modify.
4494
4495   @type lu: L{LogicalUnit}
4496   @param lu: the logical unit for which we check
4497   @type nodenames: list
4498   @param nodenames: the list of nodes on which we should check
4499   @type hvname: string
4500   @param hvname: the name of the hypervisor we should use
4501   @type hvparams: dict
4502   @param hvparams: the parameters which we need to check
4503   @raise errors.OpPrereqError: if the parameters are not valid
4504
4505   """
4506   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4507                                                   hvname,
4508                                                   hvparams)
4509   for node in nodenames:
4510     info = hvinfo[node]
4511     if info.offline:
4512       continue
4513     msg = info.RemoteFailMsg()
4514     if msg:
4515       raise errors.OpPrereqError("Hypervisor parameter validation"
4516                                  " failed on node %s: %s" % (node, msg))
4517
4518
4519 class LUCreateInstance(LogicalUnit):
4520   """Create an instance.
4521
4522   """
4523   HPATH = "instance-add"
4524   HTYPE = constants.HTYPE_INSTANCE
4525   _OP_REQP = ["instance_name", "disks", "disk_template",
4526               "mode", "start",
4527               "wait_for_sync", "ip_check", "nics",
4528               "hvparams", "beparams"]
4529   REQ_BGL = False
4530
4531   def _ExpandNode(self, node):
4532     """Expands and checks one node name.
4533
4534     """
4535     node_full = self.cfg.ExpandNodeName(node)
4536     if node_full is None:
4537       raise errors.OpPrereqError("Unknown node %s" % node)
4538     return node_full
4539
4540   def ExpandNames(self):
4541     """ExpandNames for CreateInstance.
4542
4543     Figure out the right locks for instance creation.
4544
4545     """
4546     self.needed_locks = {}
4547
4548     # set optional parameters to none if they don't exist
4549     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4550       if not hasattr(self.op, attr):
4551         setattr(self.op, attr, None)
4552
4553     # cheap checks, mostly valid constants given
4554
4555     # verify creation mode
4556     if self.op.mode not in (constants.INSTANCE_CREATE,
4557                             constants.INSTANCE_IMPORT):
4558       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4559                                  self.op.mode)
4560
4561     # disk template and mirror node verification
4562     if self.op.disk_template not in constants.DISK_TEMPLATES:
4563       raise errors.OpPrereqError("Invalid disk template name")
4564
4565     if self.op.hypervisor is None:
4566       self.op.hypervisor = self.cfg.GetHypervisorType()
4567
4568     cluster = self.cfg.GetClusterInfo()
4569     enabled_hvs = cluster.enabled_hypervisors
4570     if self.op.hypervisor not in enabled_hvs:
4571       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4572                                  " cluster (%s)" % (self.op.hypervisor,
4573                                   ",".join(enabled_hvs)))
4574
4575     # check hypervisor parameter syntax (locally)
4576     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4577     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4578                                   self.op.hvparams)
4579     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4580     hv_type.CheckParameterSyntax(filled_hvp)
4581     self.hv_full = filled_hvp
4582
4583     # fill and remember the beparams dict
4584     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4585     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4586                                     self.op.beparams)
4587
4588     #### instance parameters check
4589
4590     # instance name verification
4591     hostname1 = utils.HostInfo(self.op.instance_name)
4592     self.op.instance_name = instance_name = hostname1.name
4593
4594     # this is just a preventive check, but someone might still add this
4595     # instance in the meantime, and creation will fail at lock-add time
4596     if instance_name in self.cfg.GetInstanceList():
4597       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4598                                  instance_name)
4599
4600     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4601
4602     # NIC buildup
4603     self.nics = []
4604     for nic in self.op.nics:
4605       # ip validity checks
4606       ip = nic.get("ip", None)
4607       if ip is None or ip.lower() == "none":
4608         nic_ip = None
4609       elif ip.lower() == constants.VALUE_AUTO:
4610         nic_ip = hostname1.ip
4611       else:
4612         if not utils.IsValidIP(ip):
4613           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4614                                      " like a valid IP" % ip)
4615         nic_ip = ip
4616
4617       # MAC address verification
4618       mac = nic.get("mac", constants.VALUE_AUTO)
4619       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4620         if not utils.IsValidMac(mac.lower()):
4621           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4622                                      mac)
4623         else:
4624           # or validate/reserve the current one
4625           if self.cfg.IsMacInUse(mac):
4626             raise errors.OpPrereqError("MAC address %s already in use"
4627                                        " in cluster" % mac)
4628
4629       # bridge verification
4630       bridge = nic.get("bridge", None)
4631       if bridge is None:
4632         bridge = self.cfg.GetDefBridge()
4633       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4634
4635     # disk checks/pre-build
4636     self.disks = []
4637     for disk in self.op.disks:
4638       mode = disk.get("mode", constants.DISK_RDWR)
4639       if mode not in constants.DISK_ACCESS_SET:
4640         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4641                                    mode)
4642       size = disk.get("size", None)
4643       if size is None:
4644         raise errors.OpPrereqError("Missing disk size")
4645       try:
4646         size = int(size)
4647       except ValueError:
4648         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4649       self.disks.append({"size": size, "mode": mode})
4650
4651     # used in CheckPrereq for ip ping check
4652     self.check_ip = hostname1.ip
4653
4654     # file storage checks
4655     if (self.op.file_driver and
4656         not self.op.file_driver in constants.FILE_DRIVER):
4657       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4658                                  self.op.file_driver)
4659
4660     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4661       raise errors.OpPrereqError("File storage directory path not absolute")
4662
4663     ### Node/iallocator related checks
4664     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4665       raise errors.OpPrereqError("One and only one of iallocator and primary"
4666                                  " node must be given")
4667
4668     if self.op.iallocator:
4669       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4670     else:
4671       self.op.pnode = self._ExpandNode(self.op.pnode)
4672       nodelist = [self.op.pnode]
4673       if self.op.snode is not None:
4674         self.op.snode = self._ExpandNode(self.op.snode)
4675         nodelist.append(self.op.snode)
4676       self.needed_locks[locking.LEVEL_NODE] = nodelist
4677
4678     # in case of import lock the source node too
4679     if self.op.mode == constants.INSTANCE_IMPORT:
4680       src_node = getattr(self.op, "src_node", None)
4681       src_path = getattr(self.op, "src_path", None)
4682
4683       if src_path is None:
4684         self.op.src_path = src_path = self.op.instance_name
4685
4686       if src_node is None:
4687         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4688         self.op.src_node = None
4689         if os.path.isabs(src_path):
4690           raise errors.OpPrereqError("Importing an instance from an absolute"
4691                                      " path requires a source node option.")
4692       else:
4693         self.op.src_node = src_node = self._ExpandNode(src_node)
4694         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4695           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4696         if not os.path.isabs(src_path):
4697           self.op.src_path = src_path = \
4698             os.path.join(constants.EXPORT_DIR, src_path)
4699
4700     else: # INSTANCE_CREATE
4701       if getattr(self.op, "os_type", None) is None:
4702         raise errors.OpPrereqError("No guest OS specified")
4703
4704   def _RunAllocator(self):
4705     """Run the allocator based on input opcode.
4706
4707     """
4708     nics = [n.ToDict() for n in self.nics]
4709     ial = IAllocator(self,
4710                      mode=constants.IALLOCATOR_MODE_ALLOC,
4711                      name=self.op.instance_name,
4712                      disk_template=self.op.disk_template,
4713                      tags=[],
4714                      os=self.op.os_type,
4715                      vcpus=self.be_full[constants.BE_VCPUS],
4716                      mem_size=self.be_full[constants.BE_MEMORY],
4717                      disks=self.disks,
4718                      nics=nics,
4719                      hypervisor=self.op.hypervisor,
4720                      )
4721
4722     ial.Run(self.op.iallocator)
4723
4724     if not ial.success:
4725       raise errors.OpPrereqError("Can't compute nodes using"
4726                                  " iallocator '%s': %s" % (self.op.iallocator,
4727                                                            ial.info))
4728     if len(ial.nodes) != ial.required_nodes:
4729       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4730                                  " of nodes (%s), required %s" %
4731                                  (self.op.iallocator, len(ial.nodes),
4732                                   ial.required_nodes))
4733     self.op.pnode = ial.nodes[0]
4734     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4735                  self.op.instance_name, self.op.iallocator,
4736                  ", ".join(ial.nodes))
4737     if ial.required_nodes == 2:
4738       self.op.snode = ial.nodes[1]
4739
4740   def BuildHooksEnv(self):
4741     """Build hooks env.
4742
4743     This runs on master, primary and secondary nodes of the instance.
4744
4745     """
4746     env = {
4747       "ADD_MODE": self.op.mode,
4748       }
4749     if self.op.mode == constants.INSTANCE_IMPORT:
4750       env["SRC_NODE"] = self.op.src_node
4751       env["SRC_PATH"] = self.op.src_path
4752       env["SRC_IMAGES"] = self.src_images
4753
4754     env.update(_BuildInstanceHookEnv(
4755       name=self.op.instance_name,
4756       primary_node=self.op.pnode,
4757       secondary_nodes=self.secondaries,
4758       status=self.op.start,
4759       os_type=self.op.os_type,
4760       memory=self.be_full[constants.BE_MEMORY],
4761       vcpus=self.be_full[constants.BE_VCPUS],
4762       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4763       disk_template=self.op.disk_template,
4764       disks=[(d["size"], d["mode"]) for d in self.disks],
4765       bep=self.be_full,
4766       hvp=self.hv_full,
4767       hypervisor_name=self.op.hypervisor,
4768     ))
4769
4770     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4771           self.secondaries)
4772     return env, nl, nl
4773
4774
4775   def CheckPrereq(self):
4776     """Check prerequisites.
4777
4778     """
4779     if (not self.cfg.GetVGName() and
4780         self.op.disk_template not in constants.DTS_NOT_LVM):
4781       raise errors.OpPrereqError("Cluster does not support lvm-based"
4782                                  " instances")
4783
4784     if self.op.mode == constants.INSTANCE_IMPORT:
4785       src_node = self.op.src_node
4786       src_path = self.op.src_path
4787
4788       if src_node is None:
4789         exp_list = self.rpc.call_export_list(
4790           self.acquired_locks[locking.LEVEL_NODE])
4791         found = False
4792         for node in exp_list:
4793           if not exp_list[node].failed and src_path in exp_list[node].data:
4794             found = True
4795             self.op.src_node = src_node = node
4796             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4797                                                        src_path)
4798             break
4799         if not found:
4800           raise errors.OpPrereqError("No export found for relative path %s" %
4801                                       src_path)
4802
4803       _CheckNodeOnline(self, src_node)
4804       result = self.rpc.call_export_info(src_node, src_path)
4805       result.Raise()
4806       if not result.data:
4807         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4808
4809       export_info = result.data
4810       if not export_info.has_section(constants.INISECT_EXP):
4811         raise errors.ProgrammerError("Corrupted export config")
4812
4813       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4814       if (int(ei_version) != constants.EXPORT_VERSION):
4815         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4816                                    (ei_version, constants.EXPORT_VERSION))
4817
4818       # Check that the new instance doesn't have less disks than the export
4819       instance_disks = len(self.disks)
4820       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4821       if instance_disks < export_disks:
4822         raise errors.OpPrereqError("Not enough disks to import."
4823                                    " (instance: %d, export: %d)" %
4824                                    (instance_disks, export_disks))
4825
4826       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4827       disk_images = []
4828       for idx in range(export_disks):
4829         option = 'disk%d_dump' % idx
4830         if export_info.has_option(constants.INISECT_INS, option):
4831           # FIXME: are the old os-es, disk sizes, etc. useful?
4832           export_name = export_info.get(constants.INISECT_INS, option)
4833           image = os.path.join(src_path, export_name)
4834           disk_images.append(image)
4835         else:
4836           disk_images.append(False)
4837
4838       self.src_images = disk_images
4839
4840       old_name = export_info.get(constants.INISECT_INS, 'name')
4841       # FIXME: int() here could throw a ValueError on broken exports
4842       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4843       if self.op.instance_name == old_name:
4844         for idx, nic in enumerate(self.nics):
4845           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4846             nic_mac_ini = 'nic%d_mac' % idx
4847             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4848
4849     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4850     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4851     if self.op.start and not self.op.ip_check:
4852       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4853                                  " adding an instance in start mode")
4854
4855     if self.op.ip_check:
4856       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4857         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4858                                    (self.check_ip, self.op.instance_name))
4859
4860     #### mac address generation
4861     # By generating here the mac address both the allocator and the hooks get
4862     # the real final mac address rather than the 'auto' or 'generate' value.
4863     # There is a race condition between the generation and the instance object
4864     # creation, which means that we know the mac is valid now, but we're not
4865     # sure it will be when we actually add the instance. If things go bad
4866     # adding the instance will abort because of a duplicate mac, and the
4867     # creation job will fail.
4868     for nic in self.nics:
4869       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4870         nic.mac = self.cfg.GenerateMAC()
4871
4872     #### allocator run
4873
4874     if self.op.iallocator is not None:
4875       self._RunAllocator()
4876
4877     #### node related checks
4878
4879     # check primary node
4880     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4881     assert self.pnode is not None, \
4882       "Cannot retrieve locked node %s" % self.op.pnode
4883     if pnode.offline:
4884       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4885                                  pnode.name)
4886     if pnode.drained:
4887       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4888                                  pnode.name)
4889
4890     self.secondaries = []
4891
4892     # mirror node verification
4893     if self.op.disk_template in constants.DTS_NET_MIRROR:
4894       if self.op.snode is None:
4895         raise errors.OpPrereqError("The networked disk templates need"
4896                                    " a mirror node")
4897       if self.op.snode == pnode.name:
4898         raise errors.OpPrereqError("The secondary node cannot be"
4899                                    " the primary node.")
4900       _CheckNodeOnline(self, self.op.snode)
4901       _CheckNodeNotDrained(self, self.op.snode)
4902       self.secondaries.append(self.op.snode)
4903
4904     nodenames = [pnode.name] + self.secondaries
4905
4906     req_size = _ComputeDiskSize(self.op.disk_template,
4907                                 self.disks)
4908
4909     # Check lv size requirements
4910     if req_size is not None:
4911       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4912                                          self.op.hypervisor)
4913       for node in nodenames:
4914         info = nodeinfo[node]
4915         info.Raise()
4916         info = info.data
4917         if not info:
4918           raise errors.OpPrereqError("Cannot get current information"
4919                                      " from node '%s'" % node)
4920         vg_free = info.get('vg_free', None)
4921         if not isinstance(vg_free, int):
4922           raise errors.OpPrereqError("Can't compute free disk space on"
4923                                      " node %s" % node)
4924         if req_size > info['vg_free']:
4925           raise errors.OpPrereqError("Not enough disk space on target node %s."
4926                                      " %d MB available, %d MB required" %
4927                                      (node, info['vg_free'], req_size))
4928
4929     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4930
4931     # os verification
4932     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4933     result.Raise()
4934     if not isinstance(result.data, objects.OS) or not result.data:
4935       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4936                                  " primary node"  % self.op.os_type)
4937
4938     # bridge check on primary node
4939     bridges = [n.bridge for n in self.nics]
4940     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4941     result.Raise()
4942     if not result.data:
4943       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4944                                  " exist on destination node '%s'" %
4945                                  (",".join(bridges), pnode.name))
4946
4947     # memory check on primary node
4948     if self.op.start:
4949       _CheckNodeFreeMemory(self, self.pnode.name,
4950                            "creating instance %s" % self.op.instance_name,
4951                            self.be_full[constants.BE_MEMORY],
4952                            self.op.hypervisor)
4953
4954   def Exec(self, feedback_fn):
4955     """Create and add the instance to the cluster.
4956
4957     """
4958     instance = self.op.instance_name
4959     pnode_name = self.pnode.name
4960
4961     ht_kind = self.op.hypervisor
4962     if ht_kind in constants.HTS_REQ_PORT:
4963       network_port = self.cfg.AllocatePort()
4964     else:
4965       network_port = None
4966
4967     ##if self.op.vnc_bind_address is None:
4968     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4969
4970     # this is needed because os.path.join does not accept None arguments
4971     if self.op.file_storage_dir is None:
4972       string_file_storage_dir = ""
4973     else:
4974       string_file_storage_dir = self.op.file_storage_dir
4975
4976     # build the full file storage dir path
4977     file_storage_dir = os.path.normpath(os.path.join(
4978                                         self.cfg.GetFileStorageDir(),
4979                                         string_file_storage_dir, instance))
4980
4981
4982     disks = _GenerateDiskTemplate(self,
4983                                   self.op.disk_template,
4984                                   instance, pnode_name,
4985                                   self.secondaries,
4986                                   self.disks,
4987                                   file_storage_dir,
4988                                   self.op.file_driver,
4989                                   0)
4990
4991     iobj = objects.Instance(name=instance, os=self.op.os_type,
4992                             primary_node=pnode_name,
4993                             nics=self.nics, disks=disks,
4994                             disk_template=self.op.disk_template,
4995                             admin_up=False,
4996                             network_port=network_port,
4997                             beparams=self.op.beparams,
4998                             hvparams=self.op.hvparams,
4999                             hypervisor=self.op.hypervisor,
5000                             )
5001
5002     feedback_fn("* creating instance disks...")
5003     try:
5004       _CreateDisks(self, iobj)
5005     except errors.OpExecError:
5006       self.LogWarning("Device creation failed, reverting...")
5007       try:
5008         _RemoveDisks(self, iobj)
5009       finally:
5010         self.cfg.ReleaseDRBDMinors(instance)
5011         raise
5012
5013     feedback_fn("adding instance %s to cluster config" % instance)
5014
5015     self.cfg.AddInstance(iobj)
5016     # Declare that we don't want to remove the instance lock anymore, as we've
5017     # added the instance to the config
5018     del self.remove_locks[locking.LEVEL_INSTANCE]
5019     # Unlock all the nodes
5020     if self.op.mode == constants.INSTANCE_IMPORT:
5021       nodes_keep = [self.op.src_node]
5022       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5023                        if node != self.op.src_node]
5024       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5025       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5026     else:
5027       self.context.glm.release(locking.LEVEL_NODE)
5028       del self.acquired_locks[locking.LEVEL_NODE]
5029
5030     if self.op.wait_for_sync:
5031       disk_abort = not _WaitForSync(self, iobj)
5032     elif iobj.disk_template in constants.DTS_NET_MIRROR:
5033       # make sure the disks are not degraded (still sync-ing is ok)
5034       time.sleep(15)
5035       feedback_fn("* checking mirrors status")
5036       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5037     else:
5038       disk_abort = False
5039
5040     if disk_abort:
5041       _RemoveDisks(self, iobj)
5042       self.cfg.RemoveInstance(iobj.name)
5043       # Make sure the instance lock gets removed
5044       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5045       raise errors.OpExecError("There are some degraded disks for"
5046                                " this instance")
5047
5048     feedback_fn("creating os for instance %s on node %s" %
5049                 (instance, pnode_name))
5050
5051     if iobj.disk_template != constants.DT_DISKLESS:
5052       if self.op.mode == constants.INSTANCE_CREATE:
5053         feedback_fn("* running the instance OS create scripts...")
5054         result = self.rpc.call_instance_os_add(pnode_name, iobj)
5055         msg = result.RemoteFailMsg()
5056         if msg:
5057           raise errors.OpExecError("Could not add os for instance %s"
5058                                    " on node %s: %s" %
5059                                    (instance, pnode_name, msg))
5060
5061       elif self.op.mode == constants.INSTANCE_IMPORT:
5062         feedback_fn("* running the instance OS import scripts...")
5063         src_node = self.op.src_node
5064         src_images = self.src_images
5065         cluster_name = self.cfg.GetClusterName()
5066         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5067                                                          src_node, src_images,
5068                                                          cluster_name)
5069         import_result.Raise()
5070         for idx, result in enumerate(import_result.data):
5071           if not result:
5072             self.LogWarning("Could not import the image %s for instance"
5073                             " %s, disk %d, on node %s" %
5074                             (src_images[idx], instance, idx, pnode_name))
5075       else:
5076         # also checked in the prereq part
5077         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5078                                      % self.op.mode)
5079
5080     if self.op.start:
5081       iobj.admin_up = True
5082       self.cfg.Update(iobj)
5083       logging.info("Starting instance %s on node %s", instance, pnode_name)
5084       feedback_fn("* starting instance...")
5085       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5086       msg = result.RemoteFailMsg()
5087       if msg:
5088         raise errors.OpExecError("Could not start instance: %s" % msg)
5089
5090
5091 class LUConnectConsole(NoHooksLU):
5092   """Connect to an instance's console.
5093
5094   This is somewhat special in that it returns the command line that
5095   you need to run on the master node in order to connect to the
5096   console.
5097
5098   """
5099   _OP_REQP = ["instance_name"]
5100   REQ_BGL = False
5101
5102   def ExpandNames(self):
5103     self._ExpandAndLockInstance()
5104
5105   def CheckPrereq(self):
5106     """Check prerequisites.
5107
5108     This checks that the instance is in the cluster.
5109
5110     """
5111     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5112     assert self.instance is not None, \
5113       "Cannot retrieve locked instance %s" % self.op.instance_name
5114     _CheckNodeOnline(self, self.instance.primary_node)
5115
5116   def Exec(self, feedback_fn):
5117     """Connect to the console of an instance
5118
5119     """
5120     instance = self.instance
5121     node = instance.primary_node
5122
5123     node_insts = self.rpc.call_instance_list([node],
5124                                              [instance.hypervisor])[node]
5125     node_insts.Raise()
5126
5127     if instance.name not in node_insts.data:
5128       raise errors.OpExecError("Instance %s is not running." % instance.name)
5129
5130     logging.debug("Connecting to console of %s on %s", instance.name, node)
5131
5132     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5133     cluster = self.cfg.GetClusterInfo()
5134     # beparams and hvparams are passed separately, to avoid editing the
5135     # instance and then saving the defaults in the instance itself.
5136     hvparams = cluster.FillHV(instance)
5137     beparams = cluster.FillBE(instance)
5138     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5139
5140     # build ssh cmdline
5141     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5142
5143
5144 class LUReplaceDisks(LogicalUnit):
5145   """Replace the disks of an instance.
5146
5147   """
5148   HPATH = "mirrors-replace"
5149   HTYPE = constants.HTYPE_INSTANCE
5150   _OP_REQP = ["instance_name", "mode", "disks"]
5151   REQ_BGL = False
5152
5153   def CheckArguments(self):
5154     if not hasattr(self.op, "remote_node"):
5155       self.op.remote_node = None
5156     if not hasattr(self.op, "iallocator"):
5157       self.op.iallocator = None
5158
5159     # check for valid parameter combination
5160     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5161     if self.op.mode == constants.REPLACE_DISK_CHG:
5162       if cnt == 2:
5163         raise errors.OpPrereqError("When changing the secondary either an"
5164                                    " iallocator script must be used or the"
5165                                    " new node given")
5166       elif cnt == 0:
5167         raise errors.OpPrereqError("Give either the iallocator or the new"
5168                                    " secondary, not both")
5169     else: # not replacing the secondary
5170       if cnt != 2:
5171         raise errors.OpPrereqError("The iallocator and new node options can"
5172                                    " be used only when changing the"
5173                                    " secondary node")
5174
5175   def ExpandNames(self):
5176     self._ExpandAndLockInstance()
5177
5178     if self.op.iallocator is not None:
5179       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5180     elif self.op.remote_node is not None:
5181       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5182       if remote_node is None:
5183         raise errors.OpPrereqError("Node '%s' not known" %
5184                                    self.op.remote_node)
5185       self.op.remote_node = remote_node
5186       # Warning: do not remove the locking of the new secondary here
5187       # unless DRBD8.AddChildren is changed to work in parallel;
5188       # currently it doesn't since parallel invocations of
5189       # FindUnusedMinor will conflict
5190       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5191       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5192     else:
5193       self.needed_locks[locking.LEVEL_NODE] = []
5194       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5195
5196   def DeclareLocks(self, level):
5197     # If we're not already locking all nodes in the set we have to declare the
5198     # instance's primary/secondary nodes.
5199     if (level == locking.LEVEL_NODE and
5200         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5201       self._LockInstancesNodes()
5202
5203   def _RunAllocator(self):
5204     """Compute a new secondary node using an IAllocator.
5205
5206     """
5207     ial = IAllocator(self,
5208                      mode=constants.IALLOCATOR_MODE_RELOC,
5209                      name=self.op.instance_name,
5210                      relocate_from=[self.sec_node])
5211
5212     ial.Run(self.op.iallocator)
5213
5214     if not ial.success:
5215       raise errors.OpPrereqError("Can't compute nodes using"
5216                                  " iallocator '%s': %s" % (self.op.iallocator,
5217                                                            ial.info))
5218     if len(ial.nodes) != ial.required_nodes:
5219       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5220                                  " of nodes (%s), required %s" %
5221                                  (self.op.iallocator,
5222                                   len(ial.nodes), ial.required_nodes))
5223     self.op.remote_node = ial.nodes[0]
5224     self.LogInfo("Selected new secondary for the instance: %s",
5225                  self.op.remote_node)
5226
5227   def BuildHooksEnv(self):
5228     """Build hooks env.
5229
5230     This runs on the master, the primary and all the secondaries.
5231
5232     """
5233     env = {
5234       "MODE": self.op.mode,
5235       "NEW_SECONDARY": self.op.remote_node,
5236       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5237       }
5238     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5239     nl = [
5240       self.cfg.GetMasterNode(),
5241       self.instance.primary_node,
5242       ]
5243     if self.op.remote_node is not None:
5244       nl.append(self.op.remote_node)
5245     return env, nl, nl
5246
5247   def CheckPrereq(self):
5248     """Check prerequisites.
5249
5250     This checks that the instance is in the cluster.
5251
5252     """
5253     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5254     assert instance is not None, \
5255       "Cannot retrieve locked instance %s" % self.op.instance_name
5256     self.instance = instance
5257
5258     if instance.disk_template != constants.DT_DRBD8:
5259       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5260                                  " instances")
5261
5262     if len(instance.secondary_nodes) != 1:
5263       raise errors.OpPrereqError("The instance has a strange layout,"
5264                                  " expected one secondary but found %d" %
5265                                  len(instance.secondary_nodes))
5266
5267     self.sec_node = instance.secondary_nodes[0]
5268
5269     if self.op.iallocator is not None:
5270       self._RunAllocator()
5271
5272     remote_node = self.op.remote_node
5273     if remote_node is not None:
5274       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5275       assert self.remote_node_info is not None, \
5276         "Cannot retrieve locked node %s" % remote_node
5277     else:
5278       self.remote_node_info = None
5279     if remote_node == instance.primary_node:
5280       raise errors.OpPrereqError("The specified node is the primary node of"
5281                                  " the instance.")
5282     elif remote_node == self.sec_node:
5283       raise errors.OpPrereqError("The specified node is already the"
5284                                  " secondary node of the instance.")
5285
5286     if self.op.mode == constants.REPLACE_DISK_PRI:
5287       n1 = self.tgt_node = instance.primary_node
5288       n2 = self.oth_node = self.sec_node
5289     elif self.op.mode == constants.REPLACE_DISK_SEC:
5290       n1 = self.tgt_node = self.sec_node
5291       n2 = self.oth_node = instance.primary_node
5292     elif self.op.mode == constants.REPLACE_DISK_CHG:
5293       n1 = self.new_node = remote_node
5294       n2 = self.oth_node = instance.primary_node
5295       self.tgt_node = self.sec_node
5296       _CheckNodeNotDrained(self, remote_node)
5297     else:
5298       raise errors.ProgrammerError("Unhandled disk replace mode")
5299
5300     _CheckNodeOnline(self, n1)
5301     _CheckNodeOnline(self, n2)
5302
5303     if not self.op.disks:
5304       self.op.disks = range(len(instance.disks))
5305
5306     for disk_idx in self.op.disks:
5307       instance.FindDisk(disk_idx)
5308
5309   def _ExecD8DiskOnly(self, feedback_fn):
5310     """Replace a disk on the primary or secondary for dbrd8.
5311
5312     The algorithm for replace is quite complicated:
5313
5314       1. for each disk to be replaced:
5315
5316         1. create new LVs on the target node with unique names
5317         1. detach old LVs from the drbd device
5318         1. rename old LVs to name_replaced.<time_t>
5319         1. rename new LVs to old LVs
5320         1. attach the new LVs (with the old names now) to the drbd device
5321
5322       1. wait for sync across all devices
5323
5324       1. for each modified disk:
5325
5326         1. remove old LVs (which have the name name_replaces.<time_t>)
5327
5328     Failures are not very well handled.
5329
5330     """
5331     steps_total = 6
5332     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5333     instance = self.instance
5334     iv_names = {}
5335     vgname = self.cfg.GetVGName()
5336     # start of work
5337     cfg = self.cfg
5338     tgt_node = self.tgt_node
5339     oth_node = self.oth_node
5340
5341     # Step: check device activation
5342     self.proc.LogStep(1, steps_total, "check device existence")
5343     info("checking volume groups")
5344     my_vg = cfg.GetVGName()
5345     results = self.rpc.call_vg_list([oth_node, tgt_node])
5346     if not results:
5347       raise errors.OpExecError("Can't list volume groups on the nodes")
5348     for node in oth_node, tgt_node:
5349       res = results[node]
5350       if res.failed or not res.data or my_vg not in res.data:
5351         raise errors.OpExecError("Volume group '%s' not found on %s" %
5352                                  (my_vg, node))
5353     for idx, dev in enumerate(instance.disks):
5354       if idx not in self.op.disks:
5355         continue
5356       for node in tgt_node, oth_node:
5357         info("checking disk/%d on %s" % (idx, node))
5358         cfg.SetDiskID(dev, node)
5359         result = self.rpc.call_blockdev_find(node, dev)
5360         msg = result.RemoteFailMsg()
5361         if not msg and not result.payload:
5362           msg = "disk not found"
5363         if msg:
5364           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5365                                    (idx, node, msg))
5366
5367     # Step: check other node consistency
5368     self.proc.LogStep(2, steps_total, "check peer consistency")
5369     for idx, dev in enumerate(instance.disks):
5370       if idx not in self.op.disks:
5371         continue
5372       info("checking disk/%d consistency on %s" % (idx, oth_node))
5373       if not _CheckDiskConsistency(self, dev, oth_node,
5374                                    oth_node==instance.primary_node):
5375         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5376                                  " to replace disks on this node (%s)" %
5377                                  (oth_node, tgt_node))
5378
5379     # Step: create new storage
5380     self.proc.LogStep(3, steps_total, "allocate new storage")
5381     for idx, dev in enumerate(instance.disks):
5382       if idx not in self.op.disks:
5383         continue
5384       size = dev.size
5385       cfg.SetDiskID(dev, tgt_node)
5386       lv_names = [".disk%d_%s" % (idx, suf)
5387                   for suf in ["data", "meta"]]
5388       names = _GenerateUniqueNames(self, lv_names)
5389       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5390                              logical_id=(vgname, names[0]))
5391       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5392                              logical_id=(vgname, names[1]))
5393       new_lvs = [lv_data, lv_meta]
5394       old_lvs = dev.children
5395       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5396       info("creating new local storage on %s for %s" %
5397            (tgt_node, dev.iv_name))
5398       # we pass force_create=True to force the LVM creation
5399       for new_lv in new_lvs:
5400         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5401                         _GetInstanceInfoText(instance), False)
5402
5403     # Step: for each lv, detach+rename*2+attach
5404     self.proc.LogStep(4, steps_total, "change drbd configuration")
5405     for dev, old_lvs, new_lvs in iv_names.itervalues():
5406       info("detaching %s drbd from local storage" % dev.iv_name)
5407       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5408       result.Raise()
5409       if not result.data:
5410         raise errors.OpExecError("Can't detach drbd from local storage on node"
5411                                  " %s for device %s" % (tgt_node, dev.iv_name))
5412       #dev.children = []
5413       #cfg.Update(instance)
5414
5415       # ok, we created the new LVs, so now we know we have the needed
5416       # storage; as such, we proceed on the target node to rename
5417       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5418       # using the assumption that logical_id == physical_id (which in
5419       # turn is the unique_id on that node)
5420
5421       # FIXME(iustin): use a better name for the replaced LVs
5422       temp_suffix = int(time.time())
5423       ren_fn = lambda d, suff: (d.physical_id[0],
5424                                 d.physical_id[1] + "_replaced-%s" % suff)
5425       # build the rename list based on what LVs exist on the node
5426       rlist = []
5427       for to_ren in old_lvs:
5428         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5429         if not result.RemoteFailMsg() and result.payload:
5430           # device exists
5431           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5432
5433       info("renaming the old LVs on the target node")
5434       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5435       result.Raise()
5436       if not result.data:
5437         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5438       # now we rename the new LVs to the old LVs
5439       info("renaming the new LVs on the target node")
5440       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5441       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5442       result.Raise()
5443       if not result.data:
5444         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5445
5446       for old, new in zip(old_lvs, new_lvs):
5447         new.logical_id = old.logical_id
5448         cfg.SetDiskID(new, tgt_node)
5449
5450       for disk in old_lvs:
5451         disk.logical_id = ren_fn(disk, temp_suffix)
5452         cfg.SetDiskID(disk, tgt_node)
5453
5454       # now that the new lvs have the old name, we can add them to the device
5455       info("adding new mirror component on %s" % tgt_node)
5456       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5457       if result.failed or not result.data:
5458         for new_lv in new_lvs:
5459           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5460           if msg:
5461             warning("Can't rollback device %s: %s", dev, msg,
5462                     hint="cleanup manually the unused logical volumes")
5463         raise errors.OpExecError("Can't add local storage to drbd")
5464
5465       dev.children = new_lvs
5466       cfg.Update(instance)
5467
5468     # Step: wait for sync
5469
5470     # this can fail as the old devices are degraded and _WaitForSync
5471     # does a combined result over all disks, so we don't check its
5472     # return value
5473     self.proc.LogStep(5, steps_total, "sync devices")
5474     _WaitForSync(self, instance, unlock=True)
5475
5476     # so check manually all the devices
5477     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5478       cfg.SetDiskID(dev, instance.primary_node)
5479       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5480       msg = result.RemoteFailMsg()
5481       if not msg and not result.payload:
5482         msg = "disk not found"
5483       if msg:
5484         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5485                                  (name, msg))
5486       if result.payload[5]:
5487         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5488
5489     # Step: remove old storage
5490     self.proc.LogStep(6, steps_total, "removing old storage")
5491     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5492       info("remove logical volumes for %s" % name)
5493       for lv in old_lvs:
5494         cfg.SetDiskID(lv, tgt_node)
5495         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5496         if msg:
5497           warning("Can't remove old LV: %s" % msg,
5498                   hint="manually remove unused LVs")
5499           continue
5500
5501   def _ExecD8Secondary(self, feedback_fn):
5502     """Replace the secondary node for drbd8.
5503
5504     The algorithm for replace is quite complicated:
5505       - for all disks of the instance:
5506         - create new LVs on the new node with same names
5507         - shutdown the drbd device on the old secondary
5508         - disconnect the drbd network on the primary
5509         - create the drbd device on the new secondary
5510         - network attach the drbd on the primary, using an artifice:
5511           the drbd code for Attach() will connect to the network if it
5512           finds a device which is connected to the good local disks but
5513           not network enabled
5514       - wait for sync across all devices
5515       - remove all disks from the old secondary
5516
5517     Failures are not very well handled.
5518
5519     """
5520     steps_total = 6
5521     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5522     instance = self.instance
5523     iv_names = {}
5524     # start of work
5525     cfg = self.cfg
5526     old_node = self.tgt_node
5527     new_node = self.new_node
5528     pri_node = instance.primary_node
5529     nodes_ip = {
5530       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5531       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5532       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5533       }
5534
5535     # Step: check device activation
5536     self.proc.LogStep(1, steps_total, "check device existence")
5537     info("checking volume groups")
5538     my_vg = cfg.GetVGName()
5539     results = self.rpc.call_vg_list([pri_node, new_node])
5540     for node in pri_node, new_node:
5541       res = results[node]
5542       if res.failed or not res.data or my_vg not in res.data:
5543         raise errors.OpExecError("Volume group '%s' not found on %s" %
5544                                  (my_vg, node))
5545     for idx, dev in enumerate(instance.disks):
5546       if idx not in self.op.disks:
5547         continue
5548       info("checking disk/%d on %s" % (idx, pri_node))
5549       cfg.SetDiskID(dev, pri_node)
5550       result = self.rpc.call_blockdev_find(pri_node, dev)
5551       msg = result.RemoteFailMsg()
5552       if not msg and not result.payload:
5553         msg = "disk not found"
5554       if msg:
5555         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5556                                  (idx, pri_node, msg))
5557
5558     # Step: check other node consistency
5559     self.proc.LogStep(2, steps_total, "check peer consistency")
5560     for idx, dev in enumerate(instance.disks):
5561       if idx not in self.op.disks:
5562         continue
5563       info("checking disk/%d consistency on %s" % (idx, pri_node))
5564       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5565         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5566                                  " unsafe to replace the secondary" %
5567                                  pri_node)
5568
5569     # Step: create new storage
5570     self.proc.LogStep(3, steps_total, "allocate new storage")
5571     for idx, dev in enumerate(instance.disks):
5572       info("adding new local storage on %s for disk/%d" %
5573            (new_node, idx))
5574       # we pass force_create=True to force LVM creation
5575       for new_lv in dev.children:
5576         _CreateBlockDev(self, new_node, instance, new_lv, True,
5577                         _GetInstanceInfoText(instance), False)
5578
5579     # Step 4: dbrd minors and drbd setups changes
5580     # after this, we must manually remove the drbd minors on both the
5581     # error and the success paths
5582     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5583                                    instance.name)
5584     logging.debug("Allocated minors %s" % (minors,))
5585     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5586     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5587       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5588       # create new devices on new_node; note that we create two IDs:
5589       # one without port, so the drbd will be activated without
5590       # networking information on the new node at this stage, and one
5591       # with network, for the latter activation in step 4
5592       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5593       if pri_node == o_node1:
5594         p_minor = o_minor1
5595       else:
5596         p_minor = o_minor2
5597
5598       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5599       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5600
5601       iv_names[idx] = (dev, dev.children, new_net_id)
5602       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5603                     new_net_id)
5604       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5605                               logical_id=new_alone_id,
5606                               children=dev.children,
5607                               size=dev.size)
5608       try:
5609         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5610                               _GetInstanceInfoText(instance), False)
5611       except errors.GenericError:
5612         self.cfg.ReleaseDRBDMinors(instance.name)
5613         raise
5614
5615     for idx, dev in enumerate(instance.disks):
5616       # we have new devices, shutdown the drbd on the old secondary
5617       info("shutting down drbd for disk/%d on old node" % idx)
5618       cfg.SetDiskID(dev, old_node)
5619       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5620       if msg:
5621         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5622                 (idx, msg),
5623                 hint="Please cleanup this device manually as soon as possible")
5624
5625     info("detaching primary drbds from the network (=> standalone)")
5626     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5627                                                instance.disks)[pri_node]
5628
5629     msg = result.RemoteFailMsg()
5630     if msg:
5631       # detaches didn't succeed (unlikely)
5632       self.cfg.ReleaseDRBDMinors(instance.name)
5633       raise errors.OpExecError("Can't detach the disks from the network on"
5634                                " old node: %s" % (msg,))
5635
5636     # if we managed to detach at least one, we update all the disks of
5637     # the instance to point to the new secondary
5638     info("updating instance configuration")
5639     for dev, _, new_logical_id in iv_names.itervalues():
5640       dev.logical_id = new_logical_id
5641       cfg.SetDiskID(dev, pri_node)
5642     cfg.Update(instance)
5643
5644     # and now perform the drbd attach
5645     info("attaching primary drbds to new secondary (standalone => connected)")
5646     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5647                                            instance.disks, instance.name,
5648                                            False)
5649     for to_node, to_result in result.items():
5650       msg = to_result.RemoteFailMsg()
5651       if msg:
5652         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5653                 hint="please do a gnt-instance info to see the"
5654                 " status of disks")
5655
5656     # this can fail as the old devices are degraded and _WaitForSync
5657     # does a combined result over all disks, so we don't check its
5658     # return value
5659     self.proc.LogStep(5, steps_total, "sync devices")
5660     _WaitForSync(self, instance, unlock=True)
5661
5662     # so check manually all the devices
5663     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5664       cfg.SetDiskID(dev, pri_node)
5665       result = self.rpc.call_blockdev_find(pri_node, dev)
5666       msg = result.RemoteFailMsg()
5667       if not msg and not result.payload:
5668         msg = "disk not found"
5669       if msg:
5670         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5671                                  (idx, msg))
5672       if result.payload[5]:
5673         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5674
5675     self.proc.LogStep(6, steps_total, "removing old storage")
5676     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5677       info("remove logical volumes for disk/%d" % idx)
5678       for lv in old_lvs:
5679         cfg.SetDiskID(lv, old_node)
5680         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5681         if msg:
5682           warning("Can't remove LV on old secondary: %s", msg,
5683                   hint="Cleanup stale volumes by hand")
5684
5685   def Exec(self, feedback_fn):
5686     """Execute disk replacement.
5687
5688     This dispatches the disk replacement to the appropriate handler.
5689
5690     """
5691     instance = self.instance
5692
5693     # Activate the instance disks if we're replacing them on a down instance
5694     if not instance.admin_up:
5695       _StartInstanceDisks(self, instance, True)
5696
5697     if self.op.mode == constants.REPLACE_DISK_CHG:
5698       fn = self._ExecD8Secondary
5699     else:
5700       fn = self._ExecD8DiskOnly
5701
5702     ret = fn(feedback_fn)
5703
5704     # Deactivate the instance disks if we're replacing them on a down instance
5705     if not instance.admin_up:
5706       _SafeShutdownInstanceDisks(self, instance)
5707
5708     return ret
5709
5710
5711 class LUGrowDisk(LogicalUnit):
5712   """Grow a disk of an instance.
5713
5714   """
5715   HPATH = "disk-grow"
5716   HTYPE = constants.HTYPE_INSTANCE
5717   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5718   REQ_BGL = False
5719
5720   def ExpandNames(self):
5721     self._ExpandAndLockInstance()
5722     self.needed_locks[locking.LEVEL_NODE] = []
5723     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5724
5725   def DeclareLocks(self, level):
5726     if level == locking.LEVEL_NODE:
5727       self._LockInstancesNodes()
5728
5729   def BuildHooksEnv(self):
5730     """Build hooks env.
5731
5732     This runs on the master, the primary and all the secondaries.
5733
5734     """
5735     env = {
5736       "DISK": self.op.disk,
5737       "AMOUNT": self.op.amount,
5738       }
5739     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5740     nl = [
5741       self.cfg.GetMasterNode(),
5742       self.instance.primary_node,
5743       ]
5744     return env, nl, nl
5745
5746   def CheckPrereq(self):
5747     """Check prerequisites.
5748
5749     This checks that the instance is in the cluster.
5750
5751     """
5752     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5753     assert instance is not None, \
5754       "Cannot retrieve locked instance %s" % self.op.instance_name
5755     nodenames = list(instance.all_nodes)
5756     for node in nodenames:
5757       _CheckNodeOnline(self, node)
5758
5759
5760     self.instance = instance
5761
5762     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5763       raise errors.OpPrereqError("Instance's disk layout does not support"
5764                                  " growing.")
5765
5766     self.disk = instance.FindDisk(self.op.disk)
5767
5768     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5769                                        instance.hypervisor)
5770     for node in nodenames:
5771       info = nodeinfo[node]
5772       if info.failed or not info.data:
5773         raise errors.OpPrereqError("Cannot get current information"
5774                                    " from node '%s'" % node)
5775       vg_free = info.data.get('vg_free', None)
5776       if not isinstance(vg_free, int):
5777         raise errors.OpPrereqError("Can't compute free disk space on"
5778                                    " node %s" % node)
5779       if self.op.amount > vg_free:
5780         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5781                                    " %d MiB available, %d MiB required" %
5782                                    (node, vg_free, self.op.amount))
5783
5784   def Exec(self, feedback_fn):
5785     """Execute disk grow.
5786
5787     """
5788     instance = self.instance
5789     disk = self.disk
5790     for node in instance.all_nodes:
5791       self.cfg.SetDiskID(disk, node)
5792       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5793       msg = result.RemoteFailMsg()
5794       if msg:
5795         raise errors.OpExecError("Grow request failed to node %s: %s" %
5796                                  (node, msg))
5797     disk.RecordGrow(self.op.amount)
5798     self.cfg.Update(instance)
5799     if self.op.wait_for_sync:
5800       disk_abort = not _WaitForSync(self, instance)
5801       if disk_abort:
5802         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5803                              " status.\nPlease check the instance.")
5804
5805
5806 class LUQueryInstanceData(NoHooksLU):
5807   """Query runtime instance data.
5808
5809   """
5810   _OP_REQP = ["instances", "static"]
5811   REQ_BGL = False
5812
5813   def ExpandNames(self):
5814     self.needed_locks = {}
5815     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5816
5817     if not isinstance(self.op.instances, list):
5818       raise errors.OpPrereqError("Invalid argument type 'instances'")
5819
5820     if self.op.instances:
5821       self.wanted_names = []
5822       for name in self.op.instances:
5823         full_name = self.cfg.ExpandInstanceName(name)
5824         if full_name is None:
5825           raise errors.OpPrereqError("Instance '%s' not known" % name)
5826         self.wanted_names.append(full_name)
5827       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5828     else:
5829       self.wanted_names = None
5830       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5831
5832     self.needed_locks[locking.LEVEL_NODE] = []
5833     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5834
5835   def DeclareLocks(self, level):
5836     if level == locking.LEVEL_NODE:
5837       self._LockInstancesNodes()
5838
5839   def CheckPrereq(self):
5840     """Check prerequisites.
5841
5842     This only checks the optional instance list against the existing names.
5843
5844     """
5845     if self.wanted_names is None:
5846       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5847
5848     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5849                              in self.wanted_names]
5850     return
5851
5852   def _ComputeDiskStatus(self, instance, snode, dev):
5853     """Compute block device status.
5854
5855     """
5856     static = self.op.static
5857     if not static:
5858       self.cfg.SetDiskID(dev, instance.primary_node)
5859       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5860       if dev_pstatus.offline:
5861         dev_pstatus = None
5862       else:
5863         msg = dev_pstatus.RemoteFailMsg()
5864         if msg:
5865           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5866                                    (instance.name, msg))
5867         dev_pstatus = dev_pstatus.payload
5868     else:
5869       dev_pstatus = None
5870
5871     if dev.dev_type in constants.LDS_DRBD:
5872       # we change the snode then (otherwise we use the one passed in)
5873       if dev.logical_id[0] == instance.primary_node:
5874         snode = dev.logical_id[1]
5875       else:
5876         snode = dev.logical_id[0]
5877
5878     if snode and not static:
5879       self.cfg.SetDiskID(dev, snode)
5880       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5881       if dev_sstatus.offline:
5882         dev_sstatus = None
5883       else:
5884         msg = dev_sstatus.RemoteFailMsg()
5885         if msg:
5886           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5887                                    (instance.name, msg))
5888         dev_sstatus = dev_sstatus.payload
5889     else:
5890       dev_sstatus = None
5891
5892     if dev.children:
5893       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5894                       for child in dev.children]
5895     else:
5896       dev_children = []
5897
5898     data = {
5899       "iv_name": dev.iv_name,
5900       "dev_type": dev.dev_type,
5901       "logical_id": dev.logical_id,
5902       "physical_id": dev.physical_id,
5903       "pstatus": dev_pstatus,
5904       "sstatus": dev_sstatus,
5905       "children": dev_children,
5906       "mode": dev.mode,
5907       "size": dev.size,
5908       }
5909
5910     return data
5911
5912   def Exec(self, feedback_fn):
5913     """Gather and return data"""
5914     result = {}
5915
5916     cluster = self.cfg.GetClusterInfo()
5917
5918     for instance in self.wanted_instances:
5919       if not self.op.static:
5920         remote_info = self.rpc.call_instance_info(instance.primary_node,
5921                                                   instance.name,
5922                                                   instance.hypervisor)
5923         remote_info.Raise()
5924         remote_info = remote_info.data
5925         if remote_info and "state" in remote_info:
5926           remote_state = "up"
5927         else:
5928           remote_state = "down"
5929       else:
5930         remote_state = None
5931       if instance.admin_up:
5932         config_state = "up"
5933       else:
5934         config_state = "down"
5935
5936       disks = [self._ComputeDiskStatus(instance, None, device)
5937                for device in instance.disks]
5938
5939       idict = {
5940         "name": instance.name,
5941         "config_state": config_state,
5942         "run_state": remote_state,
5943         "pnode": instance.primary_node,
5944         "snodes": instance.secondary_nodes,
5945         "os": instance.os,
5946         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5947         "disks": disks,
5948         "hypervisor": instance.hypervisor,
5949         "network_port": instance.network_port,
5950         "hv_instance": instance.hvparams,
5951         "hv_actual": cluster.FillHV(instance),
5952         "be_instance": instance.beparams,
5953         "be_actual": cluster.FillBE(instance),
5954         }
5955
5956       result[instance.name] = idict
5957
5958     return result
5959
5960
5961 class LUSetInstanceParams(LogicalUnit):
5962   """Modifies an instances's parameters.
5963
5964   """
5965   HPATH = "instance-modify"
5966   HTYPE = constants.HTYPE_INSTANCE
5967   _OP_REQP = ["instance_name"]
5968   REQ_BGL = False
5969
5970   def CheckArguments(self):
5971     if not hasattr(self.op, 'nics'):
5972       self.op.nics = []
5973     if not hasattr(self.op, 'disks'):
5974       self.op.disks = []
5975     if not hasattr(self.op, 'beparams'):
5976       self.op.beparams = {}
5977     if not hasattr(self.op, 'hvparams'):
5978       self.op.hvparams = {}
5979     self.op.force = getattr(self.op, "force", False)
5980     if not (self.op.nics or self.op.disks or
5981             self.op.hvparams or self.op.beparams):
5982       raise errors.OpPrereqError("No changes submitted")
5983
5984     # Disk validation
5985     disk_addremove = 0
5986     for disk_op, disk_dict in self.op.disks:
5987       if disk_op == constants.DDM_REMOVE:
5988         disk_addremove += 1
5989         continue
5990       elif disk_op == constants.DDM_ADD:
5991         disk_addremove += 1
5992       else:
5993         if not isinstance(disk_op, int):
5994           raise errors.OpPrereqError("Invalid disk index")
5995       if disk_op == constants.DDM_ADD:
5996         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5997         if mode not in constants.DISK_ACCESS_SET:
5998           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5999         size = disk_dict.get('size', None)
6000         if size is None:
6001           raise errors.OpPrereqError("Required disk parameter size missing")
6002         try:
6003           size = int(size)
6004         except ValueError, err:
6005           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6006                                      str(err))
6007         disk_dict['size'] = size
6008       else:
6009         # modification of disk
6010         if 'size' in disk_dict:
6011           raise errors.OpPrereqError("Disk size change not possible, use"
6012                                      " grow-disk")
6013
6014     if disk_addremove > 1:
6015       raise errors.OpPrereqError("Only one disk add or remove operation"
6016                                  " supported at a time")
6017
6018     # NIC validation
6019     nic_addremove = 0
6020     for nic_op, nic_dict in self.op.nics:
6021       if nic_op == constants.DDM_REMOVE:
6022         nic_addremove += 1
6023         continue
6024       elif nic_op == constants.DDM_ADD:
6025         nic_addremove += 1
6026       else:
6027         if not isinstance(nic_op, int):
6028           raise errors.OpPrereqError("Invalid nic index")
6029
6030       # nic_dict should be a dict
6031       nic_ip = nic_dict.get('ip', None)
6032       if nic_ip is not None:
6033         if nic_ip.lower() == constants.VALUE_NONE:
6034           nic_dict['ip'] = None
6035         else:
6036           if not utils.IsValidIP(nic_ip):
6037             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6038
6039       if nic_op == constants.DDM_ADD:
6040         nic_bridge = nic_dict.get('bridge', None)
6041         if nic_bridge is None:
6042           nic_dict['bridge'] = self.cfg.GetDefBridge()
6043         nic_mac = nic_dict.get('mac', None)
6044         if nic_mac is None:
6045           nic_dict['mac'] = constants.VALUE_AUTO
6046
6047       if 'mac' in nic_dict:
6048         nic_mac = nic_dict['mac']
6049         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6050           if not utils.IsValidMac(nic_mac):
6051             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6052         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6053           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6054                                      " modifying an existing nic")
6055
6056     if nic_addremove > 1:
6057       raise errors.OpPrereqError("Only one NIC add or remove operation"
6058                                  " supported at a time")
6059
6060   def ExpandNames(self):
6061     self._ExpandAndLockInstance()
6062     self.needed_locks[locking.LEVEL_NODE] = []
6063     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6064
6065   def DeclareLocks(self, level):
6066     if level == locking.LEVEL_NODE:
6067       self._LockInstancesNodes()
6068
6069   def BuildHooksEnv(self):
6070     """Build hooks env.
6071
6072     This runs on the master, primary and secondaries.
6073
6074     """
6075     args = dict()
6076     if constants.BE_MEMORY in self.be_new:
6077       args['memory'] = self.be_new[constants.BE_MEMORY]
6078     if constants.BE_VCPUS in self.be_new:
6079       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6080     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6081     # information at all.
6082     if self.op.nics:
6083       args['nics'] = []
6084       nic_override = dict(self.op.nics)
6085       for idx, nic in enumerate(self.instance.nics):
6086         if idx in nic_override:
6087           this_nic_override = nic_override[idx]
6088         else:
6089           this_nic_override = {}
6090         if 'ip' in this_nic_override:
6091           ip = this_nic_override['ip']
6092         else:
6093           ip = nic.ip
6094         if 'bridge' in this_nic_override:
6095           bridge = this_nic_override['bridge']
6096         else:
6097           bridge = nic.bridge
6098         if 'mac' in this_nic_override:
6099           mac = this_nic_override['mac']
6100         else:
6101           mac = nic.mac
6102         args['nics'].append((ip, bridge, mac))
6103       if constants.DDM_ADD in nic_override:
6104         ip = nic_override[constants.DDM_ADD].get('ip', None)
6105         bridge = nic_override[constants.DDM_ADD]['bridge']
6106         mac = nic_override[constants.DDM_ADD]['mac']
6107         args['nics'].append((ip, bridge, mac))
6108       elif constants.DDM_REMOVE in nic_override:
6109         del args['nics'][-1]
6110
6111     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6112     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6113     return env, nl, nl
6114
6115   def CheckPrereq(self):
6116     """Check prerequisites.
6117
6118     This only checks the instance list against the existing names.
6119
6120     """
6121     self.force = self.op.force
6122
6123     # checking the new params on the primary/secondary nodes
6124
6125     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6126     assert self.instance is not None, \
6127       "Cannot retrieve locked instance %s" % self.op.instance_name
6128     pnode = instance.primary_node
6129     nodelist = list(instance.all_nodes)
6130
6131     # hvparams processing
6132     if self.op.hvparams:
6133       i_hvdict = copy.deepcopy(instance.hvparams)
6134       for key, val in self.op.hvparams.iteritems():
6135         if val == constants.VALUE_DEFAULT:
6136           try:
6137             del i_hvdict[key]
6138           except KeyError:
6139             pass
6140         else:
6141           i_hvdict[key] = val
6142       cluster = self.cfg.GetClusterInfo()
6143       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
6144       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
6145                                 i_hvdict)
6146       # local check
6147       hypervisor.GetHypervisor(
6148         instance.hypervisor).CheckParameterSyntax(hv_new)
6149       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6150       self.hv_new = hv_new # the new actual values
6151       self.hv_inst = i_hvdict # the new dict (without defaults)
6152     else:
6153       self.hv_new = self.hv_inst = {}
6154
6155     # beparams processing
6156     if self.op.beparams:
6157       i_bedict = copy.deepcopy(instance.beparams)
6158       for key, val in self.op.beparams.iteritems():
6159         if val == constants.VALUE_DEFAULT:
6160           try:
6161             del i_bedict[key]
6162           except KeyError:
6163             pass
6164         else:
6165           i_bedict[key] = val
6166       cluster = self.cfg.GetClusterInfo()
6167       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6168       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6169                                 i_bedict)
6170       self.be_new = be_new # the new actual values
6171       self.be_inst = i_bedict # the new dict (without defaults)
6172     else:
6173       self.be_new = self.be_inst = {}
6174
6175     self.warn = []
6176
6177     if constants.BE_MEMORY in self.op.beparams and not self.force:
6178       mem_check_list = [pnode]
6179       if be_new[constants.BE_AUTO_BALANCE]:
6180         # either we changed auto_balance to yes or it was from before
6181         mem_check_list.extend(instance.secondary_nodes)
6182       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6183                                                   instance.hypervisor)
6184       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6185                                          instance.hypervisor)
6186       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6187         # Assume the primary node is unreachable and go ahead
6188         self.warn.append("Can't get info from primary node %s" % pnode)
6189       else:
6190         if not instance_info.failed and instance_info.data:
6191           current_mem = int(instance_info.data['memory'])
6192         else:
6193           # Assume instance not running
6194           # (there is a slight race condition here, but it's not very probable,
6195           # and we have no other way to check)
6196           current_mem = 0
6197         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6198                     nodeinfo[pnode].data['memory_free'])
6199         if miss_mem > 0:
6200           raise errors.OpPrereqError("This change will prevent the instance"
6201                                      " from starting, due to %d MB of memory"
6202                                      " missing on its primary node" % miss_mem)
6203
6204       if be_new[constants.BE_AUTO_BALANCE]:
6205         for node, nres in nodeinfo.iteritems():
6206           if node not in instance.secondary_nodes:
6207             continue
6208           if nres.failed or not isinstance(nres.data, dict):
6209             self.warn.append("Can't get info from secondary node %s" % node)
6210           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6211             self.warn.append("Not enough memory to failover instance to"
6212                              " secondary node %s" % node)
6213
6214     # NIC processing
6215     for nic_op, nic_dict in self.op.nics:
6216       if nic_op == constants.DDM_REMOVE:
6217         if not instance.nics:
6218           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6219         continue
6220       if nic_op != constants.DDM_ADD:
6221         # an existing nic
6222         if nic_op < 0 or nic_op >= len(instance.nics):
6223           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6224                                      " are 0 to %d" %
6225                                      (nic_op, len(instance.nics)))
6226       if 'bridge' in nic_dict:
6227         nic_bridge = nic_dict['bridge']
6228         if nic_bridge is None:
6229           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6230         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6231           msg = ("Bridge '%s' doesn't exist on one of"
6232                  " the instance nodes" % nic_bridge)
6233           if self.force:
6234             self.warn.append(msg)
6235           else:
6236             raise errors.OpPrereqError(msg)
6237       if 'mac' in nic_dict:
6238         nic_mac = nic_dict['mac']
6239         if nic_mac is None:
6240           raise errors.OpPrereqError('Cannot set the nic mac to None')
6241         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6242           # otherwise generate the mac
6243           nic_dict['mac'] = self.cfg.GenerateMAC()
6244         else:
6245           # or validate/reserve the current one
6246           if self.cfg.IsMacInUse(nic_mac):
6247             raise errors.OpPrereqError("MAC address %s already in use"
6248                                        " in cluster" % nic_mac)
6249
6250     # DISK processing
6251     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6252       raise errors.OpPrereqError("Disk operations not supported for"
6253                                  " diskless instances")
6254     for disk_op, disk_dict in self.op.disks:
6255       if disk_op == constants.DDM_REMOVE:
6256         if len(instance.disks) == 1:
6257           raise errors.OpPrereqError("Cannot remove the last disk of"
6258                                      " an instance")
6259         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6260         ins_l = ins_l[pnode]
6261         if ins_l.failed or not isinstance(ins_l.data, list):
6262           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6263         if instance.name in ins_l.data:
6264           raise errors.OpPrereqError("Instance is running, can't remove"
6265                                      " disks.")
6266
6267       if (disk_op == constants.DDM_ADD and
6268           len(instance.nics) >= constants.MAX_DISKS):
6269         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6270                                    " add more" % constants.MAX_DISKS)
6271       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6272         # an existing disk
6273         if disk_op < 0 or disk_op >= len(instance.disks):
6274           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6275                                      " are 0 to %d" %
6276                                      (disk_op, len(instance.disks)))
6277
6278     return
6279
6280   def Exec(self, feedback_fn):
6281     """Modifies an instance.
6282
6283     All parameters take effect only at the next restart of the instance.
6284
6285     """
6286     # Process here the warnings from CheckPrereq, as we don't have a
6287     # feedback_fn there.
6288     for warn in self.warn:
6289       feedback_fn("WARNING: %s" % warn)
6290
6291     result = []
6292     instance = self.instance
6293     # disk changes
6294     for disk_op, disk_dict in self.op.disks:
6295       if disk_op == constants.DDM_REMOVE:
6296         # remove the last disk
6297         device = instance.disks.pop()
6298         device_idx = len(instance.disks)
6299         for node, disk in device.ComputeNodeTree(instance.primary_node):
6300           self.cfg.SetDiskID(disk, node)
6301           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6302           if msg:
6303             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6304                             " continuing anyway", device_idx, node, msg)
6305         result.append(("disk/%d" % device_idx, "remove"))
6306       elif disk_op == constants.DDM_ADD:
6307         # add a new disk
6308         if instance.disk_template == constants.DT_FILE:
6309           file_driver, file_path = instance.disks[0].logical_id
6310           file_path = os.path.dirname(file_path)
6311         else:
6312           file_driver = file_path = None
6313         disk_idx_base = len(instance.disks)
6314         new_disk = _GenerateDiskTemplate(self,
6315                                          instance.disk_template,
6316                                          instance.name, instance.primary_node,
6317                                          instance.secondary_nodes,
6318                                          [disk_dict],
6319                                          file_path,
6320                                          file_driver,
6321                                          disk_idx_base)[0]
6322         instance.disks.append(new_disk)
6323         info = _GetInstanceInfoText(instance)
6324
6325         logging.info("Creating volume %s for instance %s",
6326                      new_disk.iv_name, instance.name)
6327         # Note: this needs to be kept in sync with _CreateDisks
6328         #HARDCODE
6329         for node in instance.all_nodes:
6330           f_create = node == instance.primary_node
6331           try:
6332             _CreateBlockDev(self, node, instance, new_disk,
6333                             f_create, info, f_create)
6334           except errors.OpExecError, err:
6335             self.LogWarning("Failed to create volume %s (%s) on"
6336                             " node %s: %s",
6337                             new_disk.iv_name, new_disk, node, err)
6338         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6339                        (new_disk.size, new_disk.mode)))
6340       else:
6341         # change a given disk
6342         instance.disks[disk_op].mode = disk_dict['mode']
6343         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6344     # NIC changes
6345     for nic_op, nic_dict in self.op.nics:
6346       if nic_op == constants.DDM_REMOVE:
6347         # remove the last nic
6348         del instance.nics[-1]
6349         result.append(("nic.%d" % len(instance.nics), "remove"))
6350       elif nic_op == constants.DDM_ADD:
6351         # mac and bridge should be set, by now
6352         mac = nic_dict['mac']
6353         bridge = nic_dict['bridge']
6354         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6355                               bridge=bridge)
6356         instance.nics.append(new_nic)
6357         result.append(("nic.%d" % (len(instance.nics) - 1),
6358                        "add:mac=%s,ip=%s,bridge=%s" %
6359                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6360       else:
6361         # change a given nic
6362         for key in 'mac', 'ip', 'bridge':
6363           if key in nic_dict:
6364             setattr(instance.nics[nic_op], key, nic_dict[key])
6365             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6366
6367     # hvparams changes
6368     if self.op.hvparams:
6369       instance.hvparams = self.hv_inst
6370       for key, val in self.op.hvparams.iteritems():
6371         result.append(("hv/%s" % key, val))
6372
6373     # beparams changes
6374     if self.op.beparams:
6375       instance.beparams = self.be_inst
6376       for key, val in self.op.beparams.iteritems():
6377         result.append(("be/%s" % key, val))
6378
6379     self.cfg.Update(instance)
6380
6381     return result
6382
6383
6384 class LUQueryExports(NoHooksLU):
6385   """Query the exports list
6386
6387   """
6388   _OP_REQP = ['nodes']
6389   REQ_BGL = False
6390
6391   def ExpandNames(self):
6392     self.needed_locks = {}
6393     self.share_locks[locking.LEVEL_NODE] = 1
6394     if not self.op.nodes:
6395       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6396     else:
6397       self.needed_locks[locking.LEVEL_NODE] = \
6398         _GetWantedNodes(self, self.op.nodes)
6399
6400   def CheckPrereq(self):
6401     """Check prerequisites.
6402
6403     """
6404     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6405
6406   def Exec(self, feedback_fn):
6407     """Compute the list of all the exported system images.
6408
6409     @rtype: dict
6410     @return: a dictionary with the structure node->(export-list)
6411         where export-list is a list of the instances exported on
6412         that node.
6413
6414     """
6415     rpcresult = self.rpc.call_export_list(self.nodes)
6416     result = {}
6417     for node in rpcresult:
6418       if rpcresult[node].failed:
6419         result[node] = False
6420       else:
6421         result[node] = rpcresult[node].data
6422
6423     return result
6424
6425
6426 class LUExportInstance(LogicalUnit):
6427   """Export an instance to an image in the cluster.
6428
6429   """
6430   HPATH = "instance-export"
6431   HTYPE = constants.HTYPE_INSTANCE
6432   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6433   REQ_BGL = False
6434
6435   def ExpandNames(self):
6436     self._ExpandAndLockInstance()
6437     # FIXME: lock only instance primary and destination node
6438     #
6439     # Sad but true, for now we have do lock all nodes, as we don't know where
6440     # the previous export might be, and and in this LU we search for it and
6441     # remove it from its current node. In the future we could fix this by:
6442     #  - making a tasklet to search (share-lock all), then create the new one,
6443     #    then one to remove, after
6444     #  - removing the removal operation altogether
6445     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6446
6447   def DeclareLocks(self, level):
6448     """Last minute lock declaration."""
6449     # All nodes are locked anyway, so nothing to do here.
6450
6451   def BuildHooksEnv(self):
6452     """Build hooks env.
6453
6454     This will run on the master, primary node and target node.
6455
6456     """
6457     env = {
6458       "EXPORT_NODE": self.op.target_node,
6459       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6460       }
6461     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6462     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6463           self.op.target_node]
6464     return env, nl, nl
6465
6466   def CheckPrereq(self):
6467     """Check prerequisites.
6468
6469     This checks that the instance and node names are valid.
6470
6471     """
6472     instance_name = self.op.instance_name
6473     self.instance = self.cfg.GetInstanceInfo(instance_name)
6474     assert self.instance is not None, \
6475           "Cannot retrieve locked instance %s" % self.op.instance_name
6476     _CheckNodeOnline(self, self.instance.primary_node)
6477
6478     self.dst_node = self.cfg.GetNodeInfo(
6479       self.cfg.ExpandNodeName(self.op.target_node))
6480
6481     if self.dst_node is None:
6482       # This is wrong node name, not a non-locked node
6483       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6484     _CheckNodeOnline(self, self.dst_node.name)
6485     _CheckNodeNotDrained(self, self.dst_node.name)
6486
6487     # instance disk type verification
6488     for disk in self.instance.disks:
6489       if disk.dev_type == constants.LD_FILE:
6490         raise errors.OpPrereqError("Export not supported for instances with"
6491                                    " file-based disks")
6492
6493   def Exec(self, feedback_fn):
6494     """Export an instance to an image in the cluster.
6495
6496     """
6497     instance = self.instance
6498     dst_node = self.dst_node
6499     src_node = instance.primary_node
6500     if self.op.shutdown:
6501       # shutdown the instance, but not the disks
6502       result = self.rpc.call_instance_shutdown(src_node, instance)
6503       msg = result.RemoteFailMsg()
6504       if msg:
6505         raise errors.OpExecError("Could not shutdown instance %s on"
6506                                  " node %s: %s" %
6507                                  (instance.name, src_node, msg))
6508
6509     vgname = self.cfg.GetVGName()
6510
6511     snap_disks = []
6512
6513     # set the disks ID correctly since call_instance_start needs the
6514     # correct drbd minor to create the symlinks
6515     for disk in instance.disks:
6516       self.cfg.SetDiskID(disk, src_node)
6517
6518     # per-disk results
6519     dresults = []
6520     try:
6521       for idx, disk in enumerate(instance.disks):
6522         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6523         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6524         if new_dev_name.failed or not new_dev_name.data:
6525           self.LogWarning("Could not snapshot disk/%d on node %s",
6526                           idx, src_node)
6527           snap_disks.append(False)
6528         else:
6529           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6530                                  logical_id=(vgname, new_dev_name.data),
6531                                  physical_id=(vgname, new_dev_name.data),
6532                                  iv_name=disk.iv_name)
6533           snap_disks.append(new_dev)
6534
6535     finally:
6536       if self.op.shutdown and instance.admin_up:
6537         result = self.rpc.call_instance_start(src_node, instance, None, None)
6538         msg = result.RemoteFailMsg()
6539         if msg:
6540           _ShutdownInstanceDisks(self, instance)
6541           raise errors.OpExecError("Could not start instance: %s" % msg)
6542
6543     # TODO: check for size
6544
6545     cluster_name = self.cfg.GetClusterName()
6546     for idx, dev in enumerate(snap_disks):
6547       if dev:
6548         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6549                                                instance, cluster_name, idx)
6550         if result.failed or not result.data:
6551           self.LogWarning("Could not export disk/%d from node %s to"
6552                           " node %s", idx, src_node, dst_node.name)
6553           dresults.append(False)
6554         else:
6555           dresults.append(True)
6556         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6557         if msg:
6558           self.LogWarning("Could not remove snapshot for disk/%d from node"
6559                           " %s: %s", idx, src_node, msg)
6560       else:
6561         dresults.append(False)
6562
6563     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6564     fin_resu = True
6565     if result.failed or not result.data:
6566       self.LogWarning("Could not finalize export for instance %s on node %s",
6567                       instance.name, dst_node.name)
6568       fin_resu = False
6569
6570     nodelist = self.cfg.GetNodeList()
6571     nodelist.remove(dst_node.name)
6572
6573     # on one-node clusters nodelist will be empty after the removal
6574     # if we proceed the backup would be removed because OpQueryExports
6575     # substitutes an empty list with the full cluster node list.
6576     if nodelist:
6577       exportlist = self.rpc.call_export_list(nodelist)
6578       for node in exportlist:
6579         if exportlist[node].failed:
6580           continue
6581         if instance.name in exportlist[node].data:
6582           if not self.rpc.call_export_remove(node, instance.name):
6583             self.LogWarning("Could not remove older export for instance %s"
6584                             " on node %s", instance.name, node)
6585     return fin_resu, dresults
6586
6587
6588 class LURemoveExport(NoHooksLU):
6589   """Remove exports related to the named instance.
6590
6591   """
6592   _OP_REQP = ["instance_name"]
6593   REQ_BGL = False
6594
6595   def ExpandNames(self):
6596     self.needed_locks = {}
6597     # We need all nodes to be locked in order for RemoveExport to work, but we
6598     # don't need to lock the instance itself, as nothing will happen to it (and
6599     # we can remove exports also for a removed instance)
6600     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6601
6602   def CheckPrereq(self):
6603     """Check prerequisites.
6604     """
6605     pass
6606
6607   def Exec(self, feedback_fn):
6608     """Remove any export.
6609
6610     """
6611     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6612     # If the instance was not found we'll try with the name that was passed in.
6613     # This will only work if it was an FQDN, though.
6614     fqdn_warn = False
6615     if not instance_name:
6616       fqdn_warn = True
6617       instance_name = self.op.instance_name
6618
6619     exportlist = self.rpc.call_export_list(self.acquired_locks[
6620       locking.LEVEL_NODE])
6621     found = False
6622     for node in exportlist:
6623       if exportlist[node].failed:
6624         self.LogWarning("Failed to query node %s, continuing" % node)
6625         continue
6626       if instance_name in exportlist[node].data:
6627         found = True
6628         result = self.rpc.call_export_remove(node, instance_name)
6629         if result.failed or not result.data:
6630           logging.error("Could not remove export for instance %s"
6631                         " on node %s", instance_name, node)
6632
6633     if fqdn_warn and not found:
6634       feedback_fn("Export not found. If trying to remove an export belonging"
6635                   " to a deleted instance please use its Fully Qualified"
6636                   " Domain Name.")
6637
6638
6639 class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
6640   """Generic tags LU.
6641
6642   This is an abstract class which is the parent of all the other tags LUs.
6643
6644   """
6645
6646   def ExpandNames(self):
6647     self.needed_locks = {}
6648     if self.op.kind == constants.TAG_NODE:
6649       name = self.cfg.ExpandNodeName(self.op.name)
6650       if name is None:
6651         raise errors.OpPrereqError("Invalid node name (%s)" %
6652                                    (self.op.name,))
6653       self.op.name = name
6654       self.needed_locks[locking.LEVEL_NODE] = name
6655     elif self.op.kind == constants.TAG_INSTANCE:
6656       name = self.cfg.ExpandInstanceName(self.op.name)
6657       if name is None:
6658         raise errors.OpPrereqError("Invalid instance name (%s)" %
6659                                    (self.op.name,))
6660       self.op.name = name
6661       self.needed_locks[locking.LEVEL_INSTANCE] = name
6662
6663   def CheckPrereq(self):
6664     """Check prerequisites.
6665
6666     """
6667     if self.op.kind == constants.TAG_CLUSTER:
6668       self.target = self.cfg.GetClusterInfo()
6669     elif self.op.kind == constants.TAG_NODE:
6670       self.target = self.cfg.GetNodeInfo(self.op.name)
6671     elif self.op.kind == constants.TAG_INSTANCE:
6672       self.target = self.cfg.GetInstanceInfo(self.op.name)
6673     else:
6674       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6675                                  str(self.op.kind))
6676
6677
6678 class LUGetTags(TagsLU):
6679   """Returns the tags of a given object.
6680
6681   """
6682   _OP_REQP = ["kind", "name"]
6683   REQ_BGL = False
6684
6685   def Exec(self, feedback_fn):
6686     """Returns the tag list.
6687
6688     """
6689     return list(self.target.GetTags())
6690
6691
6692 class LUSearchTags(NoHooksLU):
6693   """Searches the tags for a given pattern.
6694
6695   """
6696   _OP_REQP = ["pattern"]
6697   REQ_BGL = False
6698
6699   def ExpandNames(self):
6700     self.needed_locks = {}
6701
6702   def CheckPrereq(self):
6703     """Check prerequisites.
6704
6705     This checks the pattern passed for validity by compiling it.
6706
6707     """
6708     try:
6709       self.re = re.compile(self.op.pattern)
6710     except re.error, err:
6711       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6712                                  (self.op.pattern, err))
6713
6714   def Exec(self, feedback_fn):
6715     """Returns the tag list.
6716
6717     """
6718     cfg = self.cfg
6719     tgts = [("/cluster", cfg.GetClusterInfo())]
6720     ilist = cfg.GetAllInstancesInfo().values()
6721     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6722     nlist = cfg.GetAllNodesInfo().values()
6723     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6724     results = []
6725     for path, target in tgts:
6726       for tag in target.GetTags():
6727         if self.re.search(tag):
6728           results.append((path, tag))
6729     return results
6730
6731
6732 class LUAddTags(TagsLU):
6733   """Sets a tag on a given object.
6734
6735   """
6736   _OP_REQP = ["kind", "name", "tags"]
6737   REQ_BGL = False
6738
6739   def CheckPrereq(self):
6740     """Check prerequisites.
6741
6742     This checks the type and length of the tag name and value.
6743
6744     """
6745     TagsLU.CheckPrereq(self)
6746     for tag in self.op.tags:
6747       objects.TaggableObject.ValidateTag(tag)
6748
6749   def Exec(self, feedback_fn):
6750     """Sets the tag.
6751
6752     """
6753     try:
6754       for tag in self.op.tags:
6755         self.target.AddTag(tag)
6756     except errors.TagError, err:
6757       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6758     try:
6759       self.cfg.Update(self.target)
6760     except errors.ConfigurationError:
6761       raise errors.OpRetryError("There has been a modification to the"
6762                                 " config file and the operation has been"
6763                                 " aborted. Please retry.")
6764
6765
6766 class LUDelTags(TagsLU):
6767   """Delete a list of tags from a given object.
6768
6769   """
6770   _OP_REQP = ["kind", "name", "tags"]
6771   REQ_BGL = False
6772
6773   def CheckPrereq(self):
6774     """Check prerequisites.
6775
6776     This checks that we have the given tag.
6777
6778     """
6779     TagsLU.CheckPrereq(self)
6780     for tag in self.op.tags:
6781       objects.TaggableObject.ValidateTag(tag)
6782     del_tags = frozenset(self.op.tags)
6783     cur_tags = self.target.GetTags()
6784     if not del_tags <= cur_tags:
6785       diff_tags = del_tags - cur_tags
6786       diff_names = ["'%s'" % tag for tag in diff_tags]
6787       diff_names.sort()
6788       raise errors.OpPrereqError("Tag(s) %s not found" %
6789                                  (",".join(diff_names)))
6790
6791   def Exec(self, feedback_fn):
6792     """Remove the tag from the object.
6793
6794     """
6795     for tag in self.op.tags:
6796       self.target.RemoveTag(tag)
6797     try:
6798       self.cfg.Update(self.target)
6799     except errors.ConfigurationError:
6800       raise errors.OpRetryError("There has been a modification to the"
6801                                 " config file and the operation has been"
6802                                 " aborted. Please retry.")
6803
6804
6805 class LUTestDelay(NoHooksLU):
6806   """Sleep for a specified amount of time.
6807
6808   This LU sleeps on the master and/or nodes for a specified amount of
6809   time.
6810
6811   """
6812   _OP_REQP = ["duration", "on_master", "on_nodes"]
6813   REQ_BGL = False
6814
6815   def ExpandNames(self):
6816     """Expand names and set required locks.
6817
6818     This expands the node list, if any.
6819
6820     """
6821     self.needed_locks = {}
6822     if self.op.on_nodes:
6823       # _GetWantedNodes can be used here, but is not always appropriate to use
6824       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6825       # more information.
6826       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6827       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6828
6829   def CheckPrereq(self):
6830     """Check prerequisites.
6831
6832     """
6833
6834   def Exec(self, feedback_fn):
6835     """Do the actual sleep.
6836
6837     """
6838     if self.op.on_master:
6839       if not utils.TestDelay(self.op.duration):
6840         raise errors.OpExecError("Error during master delay test")
6841     if self.op.on_nodes:
6842       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6843       if not result:
6844         raise errors.OpExecError("Complete failure from rpc call")
6845       for node, node_result in result.items():
6846         node_result.Raise()
6847         if not node_result.data:
6848           raise errors.OpExecError("Failure during rpc call to node %s,"
6849                                    " result: %s" % (node, node_result.data))
6850
6851
6852 class IAllocator(object):
6853   """IAllocator framework.
6854
6855   An IAllocator instance has three sets of attributes:
6856     - cfg that is needed to query the cluster
6857     - input data (all members of the _KEYS class attribute are required)
6858     - four buffer attributes (in|out_data|text), that represent the
6859       input (to the external script) in text and data structure format,
6860       and the output from it, again in two formats
6861     - the result variables from the script (success, info, nodes) for
6862       easy usage
6863
6864   """
6865   _ALLO_KEYS = [
6866     "mem_size", "disks", "disk_template",
6867     "os", "tags", "nics", "vcpus", "hypervisor",
6868     ]
6869   _RELO_KEYS = [
6870     "relocate_from",
6871     ]
6872
6873   def __init__(self, lu, mode, name, **kwargs):
6874     self.lu = lu
6875     # init buffer variables
6876     self.in_text = self.out_text = self.in_data = self.out_data = None
6877     # init all input fields so that pylint is happy
6878     self.mode = mode
6879     self.name = name
6880     self.mem_size = self.disks = self.disk_template = None
6881     self.os = self.tags = self.nics = self.vcpus = None
6882     self.hypervisor = None
6883     self.relocate_from = None
6884     # computed fields
6885     self.required_nodes = None
6886     # init result fields
6887     self.success = self.info = self.nodes = None
6888     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6889       keyset = self._ALLO_KEYS
6890     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6891       keyset = self._RELO_KEYS
6892     else:
6893       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6894                                    " IAllocator" % self.mode)
6895     for key in kwargs:
6896       if key not in keyset:
6897         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6898                                      " IAllocator" % key)
6899       setattr(self, key, kwargs[key])
6900     for key in keyset:
6901       if key not in kwargs:
6902         raise errors.ProgrammerError("Missing input parameter '%s' to"
6903                                      " IAllocator" % key)
6904     self._BuildInputData()
6905
6906   def _ComputeClusterData(self):
6907     """Compute the generic allocator input data.
6908
6909     This is the data that is independent of the actual operation.
6910
6911     """
6912     cfg = self.lu.cfg
6913     cluster_info = cfg.GetClusterInfo()
6914     # cluster data
6915     data = {
6916       "version": constants.IALLOCATOR_VERSION,
6917       "cluster_name": cfg.GetClusterName(),
6918       "cluster_tags": list(cluster_info.GetTags()),
6919       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6920       # we don't have job IDs
6921       }
6922     iinfo = cfg.GetAllInstancesInfo().values()
6923     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6924
6925     # node data
6926     node_results = {}
6927     node_list = cfg.GetNodeList()
6928
6929     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6930       hypervisor_name = self.hypervisor
6931     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6932       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6933
6934     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6935                                            hypervisor_name)
6936     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6937                        cluster_info.enabled_hypervisors)
6938     for nname, nresult in node_data.items():
6939       # first fill in static (config-based) values
6940       ninfo = cfg.GetNodeInfo(nname)
6941       pnr = {
6942         "tags": list(ninfo.GetTags()),
6943         "primary_ip": ninfo.primary_ip,
6944         "secondary_ip": ninfo.secondary_ip,
6945         "offline": ninfo.offline,
6946         "drained": ninfo.drained,
6947         "master_candidate": ninfo.master_candidate,
6948         }
6949
6950       if not (ninfo.offline or ninfo.drained):
6951         nresult.Raise()
6952         if not isinstance(nresult.data, dict):
6953           raise errors.OpExecError("Can't get data for node %s" % nname)
6954         remote_info = nresult.data
6955         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6956                      'vg_size', 'vg_free', 'cpu_total']:
6957           if attr not in remote_info:
6958             raise errors.OpExecError("Node '%s' didn't return attribute"
6959                                      " '%s'" % (nname, attr))
6960           try:
6961             remote_info[attr] = int(remote_info[attr])
6962           except ValueError, err:
6963             raise errors.OpExecError("Node '%s' returned invalid value"
6964                                      " for '%s': %s" % (nname, attr, err))
6965         # compute memory used by primary instances
6966         i_p_mem = i_p_up_mem = 0
6967         for iinfo, beinfo in i_list:
6968           if iinfo.primary_node == nname:
6969             i_p_mem += beinfo[constants.BE_MEMORY]
6970             if iinfo.name not in node_iinfo[nname].data:
6971               i_used_mem = 0
6972             else:
6973               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6974             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6975             remote_info['memory_free'] -= max(0, i_mem_diff)
6976
6977             if iinfo.admin_up:
6978               i_p_up_mem += beinfo[constants.BE_MEMORY]
6979
6980         # compute memory used by instances
6981         pnr_dyn = {
6982           "total_memory": remote_info['memory_total'],
6983           "reserved_memory": remote_info['memory_dom0'],
6984           "free_memory": remote_info['memory_free'],
6985           "total_disk": remote_info['vg_size'],
6986           "free_disk": remote_info['vg_free'],
6987           "total_cpus": remote_info['cpu_total'],
6988           "i_pri_memory": i_p_mem,
6989           "i_pri_up_memory": i_p_up_mem,
6990           }
6991         pnr.update(pnr_dyn)
6992
6993       node_results[nname] = pnr
6994     data["nodes"] = node_results
6995
6996     # instance data
6997     instance_data = {}
6998     for iinfo, beinfo in i_list:
6999       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
7000                   for n in iinfo.nics]
7001       pir = {
7002         "tags": list(iinfo.GetTags()),
7003         "admin_up": iinfo.admin_up,
7004         "vcpus": beinfo[constants.BE_VCPUS],
7005         "memory": beinfo[constants.BE_MEMORY],
7006         "os": iinfo.os,
7007         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7008         "nics": nic_data,
7009         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7010         "disk_template": iinfo.disk_template,
7011         "hypervisor": iinfo.hypervisor,
7012         }
7013       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7014                                                  pir["disks"])
7015       instance_data[iinfo.name] = pir
7016
7017     data["instances"] = instance_data
7018
7019     self.in_data = data
7020
7021   def _AddNewInstance(self):
7022     """Add new instance data to allocator structure.
7023
7024     This in combination with _AllocatorGetClusterData will create the
7025     correct structure needed as input for the allocator.
7026
7027     The checks for the completeness of the opcode must have already been
7028     done.
7029
7030     """
7031     data = self.in_data
7032
7033     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7034
7035     if self.disk_template in constants.DTS_NET_MIRROR:
7036       self.required_nodes = 2
7037     else:
7038       self.required_nodes = 1
7039     request = {
7040       "type": "allocate",
7041       "name": self.name,
7042       "disk_template": self.disk_template,
7043       "tags": self.tags,
7044       "os": self.os,
7045       "vcpus": self.vcpus,
7046       "memory": self.mem_size,
7047       "disks": self.disks,
7048       "disk_space_total": disk_space,
7049       "nics": self.nics,
7050       "required_nodes": self.required_nodes,
7051       }
7052     data["request"] = request
7053
7054   def _AddRelocateInstance(self):
7055     """Add relocate instance data to allocator structure.
7056
7057     This in combination with _IAllocatorGetClusterData will create the
7058     correct structure needed as input for the allocator.
7059
7060     The checks for the completeness of the opcode must have already been
7061     done.
7062
7063     """
7064     instance = self.lu.cfg.GetInstanceInfo(self.name)
7065     if instance is None:
7066       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7067                                    " IAllocator" % self.name)
7068
7069     if instance.disk_template not in constants.DTS_NET_MIRROR:
7070       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7071
7072     if len(instance.secondary_nodes) != 1:
7073       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7074
7075     self.required_nodes = 1
7076     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7077     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7078
7079     request = {
7080       "type": "relocate",
7081       "name": self.name,
7082       "disk_space_total": disk_space,
7083       "required_nodes": self.required_nodes,
7084       "relocate_from": self.relocate_from,
7085       }
7086     self.in_data["request"] = request
7087
7088   def _BuildInputData(self):
7089     """Build input data structures.
7090
7091     """
7092     self._ComputeClusterData()
7093
7094     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7095       self._AddNewInstance()
7096     else:
7097       self._AddRelocateInstance()
7098
7099     self.in_text = serializer.Dump(self.in_data)
7100
7101   def Run(self, name, validate=True, call_fn=None):
7102     """Run an instance allocator and return the results.
7103
7104     """
7105     if call_fn is None:
7106       call_fn = self.lu.rpc.call_iallocator_runner
7107
7108     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7109     result.Raise()
7110
7111     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7112       raise errors.OpExecError("Invalid result from master iallocator runner")
7113
7114     rcode, stdout, stderr, fail = result.data
7115
7116     if rcode == constants.IARUN_NOTFOUND:
7117       raise errors.OpExecError("Can't find allocator '%s'" % name)
7118     elif rcode == constants.IARUN_FAILURE:
7119       raise errors.OpExecError("Instance allocator call failed: %s,"
7120                                " output: %s" % (fail, stdout+stderr))
7121     self.out_text = stdout
7122     if validate:
7123       self._ValidateResult()
7124
7125   def _ValidateResult(self):
7126     """Process the allocator results.
7127
7128     This will process and if successful save the result in
7129     self.out_data and the other parameters.
7130
7131     """
7132     try:
7133       rdict = serializer.Load(self.out_text)
7134     except Exception, err:
7135       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7136
7137     if not isinstance(rdict, dict):
7138       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7139
7140     for key in "success", "info", "nodes":
7141       if key not in rdict:
7142         raise errors.OpExecError("Can't parse iallocator results:"
7143                                  " missing key '%s'" % key)
7144       setattr(self, key, rdict[key])
7145
7146     if not isinstance(rdict["nodes"], list):
7147       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7148                                " is not a list")
7149     self.out_data = rdict
7150
7151
7152 class LUTestAllocator(NoHooksLU):
7153   """Run allocator tests.
7154
7155   This LU runs the allocator tests
7156
7157   """
7158   _OP_REQP = ["direction", "mode", "name"]
7159
7160   def CheckPrereq(self):
7161     """Check prerequisites.
7162
7163     This checks the opcode parameters depending on the director and mode test.
7164
7165     """
7166     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7167       for attr in ["name", "mem_size", "disks", "disk_template",
7168                    "os", "tags", "nics", "vcpus"]:
7169         if not hasattr(self.op, attr):
7170           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7171                                      attr)
7172       iname = self.cfg.ExpandInstanceName(self.op.name)
7173       if iname is not None:
7174         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7175                                    iname)
7176       if not isinstance(self.op.nics, list):
7177         raise errors.OpPrereqError("Invalid parameter 'nics'")
7178       for row in self.op.nics:
7179         if (not isinstance(row, dict) or
7180             "mac" not in row or
7181             "ip" not in row or
7182             "bridge" not in row):
7183           raise errors.OpPrereqError("Invalid contents of the"
7184                                      " 'nics' parameter")
7185       if not isinstance(self.op.disks, list):
7186         raise errors.OpPrereqError("Invalid parameter 'disks'")
7187       for row in self.op.disks:
7188         if (not isinstance(row, dict) or
7189             "size" not in row or
7190             not isinstance(row["size"], int) or
7191             "mode" not in row or
7192             row["mode"] not in ['r', 'w']):
7193           raise errors.OpPrereqError("Invalid contents of the"
7194                                      " 'disks' parameter")
7195       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7196         self.op.hypervisor = self.cfg.GetHypervisorType()
7197     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7198       if not hasattr(self.op, "name"):
7199         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7200       fname = self.cfg.ExpandInstanceName(self.op.name)
7201       if fname is None:
7202         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7203                                    self.op.name)
7204       self.op.name = fname
7205       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7206     else:
7207       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7208                                  self.op.mode)
7209
7210     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7211       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7212         raise errors.OpPrereqError("Missing allocator name")
7213     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7214       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7215                                  self.op.direction)
7216
7217   def Exec(self, feedback_fn):
7218     """Run the allocator test.
7219
7220     """
7221     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7222       ial = IAllocator(self,
7223                        mode=self.op.mode,
7224                        name=self.op.name,
7225                        mem_size=self.op.mem_size,
7226                        disks=self.op.disks,
7227                        disk_template=self.op.disk_template,
7228                        os=self.op.os,
7229                        tags=self.op.tags,
7230                        nics=self.op.nics,
7231                        vcpus=self.op.vcpus,
7232                        hypervisor=self.op.hypervisor,
7233                        )
7234     else:
7235       ial = IAllocator(self,
7236                        mode=self.op.mode,
7237                        name=self.op.name,
7238                        relocate_from=list(self.relocate_from),
7239                        )
7240
7241     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7242       result = ial.in_text
7243     else:
7244       ial.Run(self.op.allocator, validate=False)
7245       result = ial.out_text
7246     return result