Fix the confusing ssh/hostname message in node add
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq
51     - implement Exec
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_BGL = True
64
65   def __init__(self, processor, op, context, rpc):
66     """Constructor for LogicalUnit.
67
68     This needs to be overridden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = context.cfg
75     self.context = context
76     self.rpc = rpc
77     # Dicts used to declare locking needs to mcpu
78     self.needed_locks = None
79     self.acquired_locks = {}
80     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
81     self.add_locks = {}
82     self.remove_locks = {}
83     # Used to force good behavior when calling helper functions
84     self.recalculate_locks = {}
85     self.__ssh = None
86     # logging
87     self.LogWarning = processor.LogWarning
88     self.LogInfo = processor.LogInfo
89
90     for attr_name in self._OP_REQP:
91       attr_val = getattr(op, attr_name, None)
92       if attr_val is None:
93         raise errors.OpPrereqError("Required parameter '%s' missing" %
94                                    attr_name)
95     self.CheckArguments()
96
97   def __GetSSH(self):
98     """Returns the SshRunner object
99
100     """
101     if not self.__ssh:
102       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
103     return self.__ssh
104
105   ssh = property(fget=__GetSSH)
106
107   def CheckArguments(self):
108     """Check syntactic validity for the opcode arguments.
109
110     This method is for doing a simple syntactic check and ensure
111     validity of opcode parameters, without any cluster-related
112     checks. While the same can be accomplished in ExpandNames and/or
113     CheckPrereq, doing these separate is better because:
114
115       - ExpandNames is left as as purely a lock-related function
116       - CheckPrereq is run after we have acquired locks (and possible
117         waited for them)
118
119     The function is allowed to change the self.op attribute so that
120     later methods can no longer worry about missing parameters.
121
122     """
123     pass
124
125   def ExpandNames(self):
126     """Expand names for this LU.
127
128     This method is called before starting to execute the opcode, and it should
129     update all the parameters of the opcode to their canonical form (e.g. a
130     short node name must be fully expanded after this method has successfully
131     completed). This way locking, hooks, logging, ecc. can work correctly.
132
133     LUs which implement this method must also populate the self.needed_locks
134     member, as a dict with lock levels as keys, and a list of needed lock names
135     as values. Rules:
136
137       - use an empty dict if you don't need any lock
138       - if you don't need any lock at a particular level omit that level
139       - don't put anything for the BGL level
140       - if you want all locks at a level use locking.ALL_SET as a value
141
142     If you need to share locks (rather than acquire them exclusively) at one
143     level you can modify self.share_locks, setting a true value (usually 1) for
144     that level. By default locks are not shared.
145
146     Examples::
147
148       # Acquire all nodes and one instance
149       self.needed_locks = {
150         locking.LEVEL_NODE: locking.ALL_SET,
151         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
152       }
153       # Acquire just two nodes
154       self.needed_locks = {
155         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
156       }
157       # Acquire no locks
158       self.needed_locks = {} # No, you can't leave it to the default value None
159
160     """
161     # The implementation of this method is mandatory only if the new LU is
162     # concurrent, so that old LUs don't need to be changed all at the same
163     # time.
164     if self.REQ_BGL:
165       self.needed_locks = {} # Exclusive LUs don't need locks.
166     else:
167       raise NotImplementedError
168
169   def DeclareLocks(self, level):
170     """Declare LU locking needs for a level
171
172     While most LUs can just declare their locking needs at ExpandNames time,
173     sometimes there's the need to calculate some locks after having acquired
174     the ones before. This function is called just before acquiring locks at a
175     particular level, but after acquiring the ones at lower levels, and permits
176     such calculations. It can be used to modify self.needed_locks, and by
177     default it does nothing.
178
179     This function is only called if you have something already set in
180     self.needed_locks for the level.
181
182     @param level: Locking level which is going to be locked
183     @type level: member of ganeti.locking.LEVELS
184
185     """
186
187   def CheckPrereq(self):
188     """Check prerequisites for this LU.
189
190     This method should check that the prerequisites for the execution
191     of this LU are fulfilled. It can do internode communication, but
192     it should be idempotent - no cluster or system changes are
193     allowed.
194
195     The method should raise errors.OpPrereqError in case something is
196     not fulfilled. Its return value is ignored.
197
198     This method should also update all the parameters of the opcode to
199     their canonical form if it hasn't been done by ExpandNames before.
200
201     """
202     raise NotImplementedError
203
204   def Exec(self, feedback_fn):
205     """Execute the LU.
206
207     This method should implement the actual work. It should raise
208     errors.OpExecError for failures that are somewhat dealt with in
209     code, or expected.
210
211     """
212     raise NotImplementedError
213
214   def BuildHooksEnv(self):
215     """Build hooks environment for this LU.
216
217     This method should return a three-node tuple consisting of: a dict
218     containing the environment that will be used for running the
219     specific hook for this LU, a list of node names on which the hook
220     should run before the execution, and a list of node names on which
221     the hook should run after the execution.
222
223     The keys of the dict must not have 'GANETI_' prefixed as this will
224     be handled in the hooks runner. Also note additional keys will be
225     added by the hooks runner. If the LU doesn't define any
226     environment, an empty dict (and not None) should be returned.
227
228     No nodes should be returned as an empty list (and not None).
229
230     Note that if the HPATH for a LU class is None, this function will
231     not be called.
232
233     """
234     raise NotImplementedError
235
236   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
237     """Notify the LU about the results of its hooks.
238
239     This method is called every time a hooks phase is executed, and notifies
240     the Logical Unit about the hooks' result. The LU can then use it to alter
241     its result based on the hooks.  By default the method does nothing and the
242     previous result is passed back unchanged but any LU can define it if it
243     wants to use the local cluster hook-scripts somehow.
244
245     @param phase: one of L{constants.HOOKS_PHASE_POST} or
246         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
247     @param hook_results: the results of the multi-node hooks rpc call
248     @param feedback_fn: function used send feedback back to the caller
249     @param lu_result: the previous Exec result this LU had, or None
250         in the PRE phase
251     @return: the new Exec result, based on the previous result
252         and hook results
253
254     """
255     return lu_result
256
257   def _ExpandAndLockInstance(self):
258     """Helper function to expand and lock an instance.
259
260     Many LUs that work on an instance take its name in self.op.instance_name
261     and need to expand it and then declare the expanded name for locking. This
262     function does it, and then updates self.op.instance_name to the expanded
263     name. It also initializes needed_locks as a dict, if this hasn't been done
264     before.
265
266     """
267     if self.needed_locks is None:
268       self.needed_locks = {}
269     else:
270       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
271         "_ExpandAndLockInstance called with instance-level locks set"
272     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
273     if expanded_name is None:
274       raise errors.OpPrereqError("Instance '%s' not known" %
275                                   self.op.instance_name)
276     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
277     self.op.instance_name = expanded_name
278
279   def _LockInstancesNodes(self, primary_only=False):
280     """Helper function to declare instances' nodes for locking.
281
282     This function should be called after locking one or more instances to lock
283     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
284     with all primary or secondary nodes for instances already locked and
285     present in self.needed_locks[locking.LEVEL_INSTANCE].
286
287     It should be called from DeclareLocks, and for safety only works if
288     self.recalculate_locks[locking.LEVEL_NODE] is set.
289
290     In the future it may grow parameters to just lock some instance's nodes, or
291     to just lock primaries or secondary nodes, if needed.
292
293     If should be called in DeclareLocks in a way similar to::
294
295       if level == locking.LEVEL_NODE:
296         self._LockInstancesNodes()
297
298     @type primary_only: boolean
299     @param primary_only: only lock primary nodes of locked instances
300
301     """
302     assert locking.LEVEL_NODE in self.recalculate_locks, \
303       "_LockInstancesNodes helper function called with no nodes to recalculate"
304
305     # TODO: check if we're really been called with the instance locks held
306
307     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
308     # future we might want to have different behaviors depending on the value
309     # of self.recalculate_locks[locking.LEVEL_NODE]
310     wanted_nodes = []
311     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
312       instance = self.context.cfg.GetInstanceInfo(instance_name)
313       wanted_nodes.append(instance.primary_node)
314       if not primary_only:
315         wanted_nodes.extend(instance.secondary_nodes)
316
317     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
318       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
319     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
320       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
321
322     del self.recalculate_locks[locking.LEVEL_NODE]
323
324
325 class NoHooksLU(LogicalUnit):
326   """Simple LU which runs no hooks.
327
328   This LU is intended as a parent for other LogicalUnits which will
329   run no hooks, in order to reduce duplicate code.
330
331   """
332   HPATH = None
333   HTYPE = None
334
335
336 def _GetWantedNodes(lu, nodes):
337   """Returns list of checked and expanded node names.
338
339   @type lu: L{LogicalUnit}
340   @param lu: the logical unit on whose behalf we execute
341   @type nodes: list
342   @param nodes: list of node names or None for all nodes
343   @rtype: list
344   @return: the list of nodes, sorted
345   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
346
347   """
348   if not isinstance(nodes, list):
349     raise errors.OpPrereqError("Invalid argument type 'nodes'")
350
351   if not nodes:
352     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
353       " non-empty list of nodes whose name is to be expanded.")
354
355   wanted = []
356   for name in nodes:
357     node = lu.cfg.ExpandNodeName(name)
358     if node is None:
359       raise errors.OpPrereqError("No such node name '%s'" % name)
360     wanted.append(node)
361
362   return utils.NiceSort(wanted)
363
364
365 def _GetWantedInstances(lu, instances):
366   """Returns list of checked and expanded instance names.
367
368   @type lu: L{LogicalUnit}
369   @param lu: the logical unit on whose behalf we execute
370   @type instances: list
371   @param instances: list of instance names or None for all instances
372   @rtype: list
373   @return: the list of instances, sorted
374   @raise errors.OpPrereqError: if the instances parameter is wrong type
375   @raise errors.OpPrereqError: if any of the passed instances is not found
376
377   """
378   if not isinstance(instances, list):
379     raise errors.OpPrereqError("Invalid argument type 'instances'")
380
381   if instances:
382     wanted = []
383
384     for name in instances:
385       instance = lu.cfg.ExpandInstanceName(name)
386       if instance is None:
387         raise errors.OpPrereqError("No such instance name '%s'" % name)
388       wanted.append(instance)
389
390   else:
391     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
392   return wanted
393
394
395 def _CheckOutputFields(static, dynamic, selected):
396   """Checks whether all selected fields are valid.
397
398   @type static: L{utils.FieldSet}
399   @param static: static fields set
400   @type dynamic: L{utils.FieldSet}
401   @param dynamic: dynamic fields set
402
403   """
404   f = utils.FieldSet()
405   f.Extend(static)
406   f.Extend(dynamic)
407
408   delta = f.NonMatching(selected)
409   if delta:
410     raise errors.OpPrereqError("Unknown output fields selected: %s"
411                                % ",".join(delta))
412
413
414 def _CheckBooleanOpField(op, name):
415   """Validates boolean opcode parameters.
416
417   This will ensure that an opcode parameter is either a boolean value,
418   or None (but that it always exists).
419
420   """
421   val = getattr(op, name, None)
422   if not (val is None or isinstance(val, bool)):
423     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
424                                (name, str(val)))
425   setattr(op, name, val)
426
427
428 def _CheckNodeOnline(lu, node):
429   """Ensure that a given node is online.
430
431   @param lu: the LU on behalf of which we make the check
432   @param node: the node to check
433   @raise errors.OpPrereqError: if the node is offline
434
435   """
436   if lu.cfg.GetNodeInfo(node).offline:
437     raise errors.OpPrereqError("Can't use offline node %s" % node)
438
439
440 def _CheckNodeNotDrained(lu, node):
441   """Ensure that a given node is not drained.
442
443   @param lu: the LU on behalf of which we make the check
444   @param node: the node to check
445   @raise errors.OpPrereqError: if the node is drained
446
447   """
448   if lu.cfg.GetNodeInfo(node).drained:
449     raise errors.OpPrereqError("Can't use drained node %s" % node)
450
451
452 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
453                           memory, vcpus, nics, disk_template, disks,
454                           bep, hvp, hypervisor_name):
455   """Builds instance related env variables for hooks
456
457   This builds the hook environment from individual variables.
458
459   @type name: string
460   @param name: the name of the instance
461   @type primary_node: string
462   @param primary_node: the name of the instance's primary node
463   @type secondary_nodes: list
464   @param secondary_nodes: list of secondary nodes as strings
465   @type os_type: string
466   @param os_type: the name of the instance's OS
467   @type status: boolean
468   @param status: the should_run status of the instance
469   @type memory: string
470   @param memory: the memory size of the instance
471   @type vcpus: string
472   @param vcpus: the count of VCPUs the instance has
473   @type nics: list
474   @param nics: list of tuples (ip, bridge, mac) representing
475       the NICs the instance  has
476   @type disk_template: string
477   @param disk_template: the disk template of the instance
478   @type disks: list
479   @param disks: the list of (size, mode) pairs
480   @type bep: dict
481   @param bep: the backend parameters for the instance
482   @type hvp: dict
483   @param hvp: the hypervisor parameters for the instance
484   @type hypervisor_name: string
485   @param hypervisor_name: the hypervisor for the instance
486   @rtype: dict
487   @return: the hook environment for this instance
488
489   """
490   if status:
491     str_status = "up"
492   else:
493     str_status = "down"
494   env = {
495     "OP_TARGET": name,
496     "INSTANCE_NAME": name,
497     "INSTANCE_PRIMARY": primary_node,
498     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
499     "INSTANCE_OS_TYPE": os_type,
500     "INSTANCE_STATUS": str_status,
501     "INSTANCE_MEMORY": memory,
502     "INSTANCE_VCPUS": vcpus,
503     "INSTANCE_DISK_TEMPLATE": disk_template,
504     "INSTANCE_HYPERVISOR": hypervisor_name,
505   }
506
507   if nics:
508     nic_count = len(nics)
509     for idx, (ip, bridge, mac) in enumerate(nics):
510       if ip is None:
511         ip = ""
512       env["INSTANCE_NIC%d_IP" % idx] = ip
513       env["INSTANCE_NIC%d_BRIDGE" % idx] = bridge
514       env["INSTANCE_NIC%d_MAC" % idx] = mac
515   else:
516     nic_count = 0
517
518   env["INSTANCE_NIC_COUNT"] = nic_count
519
520   if disks:
521     disk_count = len(disks)
522     for idx, (size, mode) in enumerate(disks):
523       env["INSTANCE_DISK%d_SIZE" % idx] = size
524       env["INSTANCE_DISK%d_MODE" % idx] = mode
525   else:
526     disk_count = 0
527
528   env["INSTANCE_DISK_COUNT"] = disk_count
529
530   for source, kind in [(bep, "BE"), (hvp, "HV")]:
531     for key, value in source.items():
532       env["INSTANCE_%s_%s" % (kind, key)] = value
533
534   return env
535
536
537 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
538   """Builds instance related env variables for hooks from an object.
539
540   @type lu: L{LogicalUnit}
541   @param lu: the logical unit on whose behalf we execute
542   @type instance: L{objects.Instance}
543   @param instance: the instance for which we should build the
544       environment
545   @type override: dict
546   @param override: dictionary with key/values that will override
547       our values
548   @rtype: dict
549   @return: the hook environment dictionary
550
551   """
552   cluster = lu.cfg.GetClusterInfo()
553   bep = cluster.FillBE(instance)
554   hvp = cluster.FillHV(instance)
555   args = {
556     'name': instance.name,
557     'primary_node': instance.primary_node,
558     'secondary_nodes': instance.secondary_nodes,
559     'os_type': instance.os,
560     'status': instance.admin_up,
561     'memory': bep[constants.BE_MEMORY],
562     'vcpus': bep[constants.BE_VCPUS],
563     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
564     'disk_template': instance.disk_template,
565     'disks': [(disk.size, disk.mode) for disk in instance.disks],
566     'bep': bep,
567     'hvp': hvp,
568     'hypervisor_name': instance.hypervisor,
569   }
570   if override:
571     args.update(override)
572   return _BuildInstanceHookEnv(**args)
573
574
575 def _AdjustCandidatePool(lu):
576   """Adjust the candidate pool after node operations.
577
578   """
579   mod_list = lu.cfg.MaintainCandidatePool()
580   if mod_list:
581     lu.LogInfo("Promoted nodes to master candidate role: %s",
582                ", ".join(node.name for node in mod_list))
583     for name in mod_list:
584       lu.context.ReaddNode(name)
585   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
586   if mc_now > mc_max:
587     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
588                (mc_now, mc_max))
589
590
591 def _CheckInstanceBridgesExist(lu, instance):
592   """Check that the bridges needed by an instance exist.
593
594   """
595   # check bridges existence
596   brlist = [nic.bridge for nic in instance.nics]
597   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
598   result.Raise()
599   if not result.data:
600     raise errors.OpPrereqError("One or more target bridges %s does not"
601                                " exist on destination node '%s'" %
602                                (brlist, instance.primary_node))
603
604
605 class LUDestroyCluster(NoHooksLU):
606   """Logical unit for destroying the cluster.
607
608   """
609   _OP_REQP = []
610
611   def CheckPrereq(self):
612     """Check prerequisites.
613
614     This checks whether the cluster is empty.
615
616     Any errors are signaled by raising errors.OpPrereqError.
617
618     """
619     master = self.cfg.GetMasterNode()
620
621     nodelist = self.cfg.GetNodeList()
622     if len(nodelist) != 1 or nodelist[0] != master:
623       raise errors.OpPrereqError("There are still %d node(s) in"
624                                  " this cluster." % (len(nodelist) - 1))
625     instancelist = self.cfg.GetInstanceList()
626     if instancelist:
627       raise errors.OpPrereqError("There are still %d instance(s) in"
628                                  " this cluster." % len(instancelist))
629
630   def Exec(self, feedback_fn):
631     """Destroys the cluster.
632
633     """
634     master = self.cfg.GetMasterNode()
635     result = self.rpc.call_node_stop_master(master, False)
636     result.Raise()
637     if not result.data:
638       raise errors.OpExecError("Could not disable the master role")
639     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
640     utils.CreateBackup(priv_key)
641     utils.CreateBackup(pub_key)
642     return master
643
644
645 class LUVerifyCluster(LogicalUnit):
646   """Verifies the cluster status.
647
648   """
649   HPATH = "cluster-verify"
650   HTYPE = constants.HTYPE_CLUSTER
651   _OP_REQP = ["skip_checks"]
652   REQ_BGL = False
653
654   def ExpandNames(self):
655     self.needed_locks = {
656       locking.LEVEL_NODE: locking.ALL_SET,
657       locking.LEVEL_INSTANCE: locking.ALL_SET,
658     }
659     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
660
661   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
662                   node_result, feedback_fn, master_files,
663                   drbd_map, vg_name):
664     """Run multiple tests against a node.
665
666     Test list:
667
668       - compares ganeti version
669       - checks vg existence and size > 20G
670       - checks config file checksum
671       - checks ssh to other nodes
672
673     @type nodeinfo: L{objects.Node}
674     @param nodeinfo: the node to check
675     @param file_list: required list of files
676     @param local_cksum: dictionary of local files and their checksums
677     @param node_result: the results from the node
678     @param feedback_fn: function used to accumulate results
679     @param master_files: list of files that only masters should have
680     @param drbd_map: the useddrbd minors for this node, in
681         form of minor: (instance, must_exist) which correspond to instances
682         and their running status
683     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
684
685     """
686     node = nodeinfo.name
687
688     # main result, node_result should be a non-empty dict
689     if not node_result or not isinstance(node_result, dict):
690       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
691       return True
692
693     # compares ganeti version
694     local_version = constants.PROTOCOL_VERSION
695     remote_version = node_result.get('version', None)
696     if not (remote_version and isinstance(remote_version, (list, tuple)) and
697             len(remote_version) == 2):
698       feedback_fn("  - ERROR: connection to %s failed" % (node))
699       return True
700
701     if local_version != remote_version[0]:
702       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
703                   " node %s %s" % (local_version, node, remote_version[0]))
704       return True
705
706     # node seems compatible, we can actually try to look into its results
707
708     bad = False
709
710     # full package version
711     if constants.RELEASE_VERSION != remote_version[1]:
712       feedback_fn("  - WARNING: software version mismatch: master %s,"
713                   " node %s %s" %
714                   (constants.RELEASE_VERSION, node, remote_version[1]))
715
716     # checks vg existence and size > 20G
717     if vg_name is not None:
718       vglist = node_result.get(constants.NV_VGLIST, None)
719       if not vglist:
720         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
721                         (node,))
722         bad = True
723       else:
724         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
725                                               constants.MIN_VG_SIZE)
726         if vgstatus:
727           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
728           bad = True
729
730     # checks config file checksum
731
732     remote_cksum = node_result.get(constants.NV_FILELIST, None)
733     if not isinstance(remote_cksum, dict):
734       bad = True
735       feedback_fn("  - ERROR: node hasn't returned file checksum data")
736     else:
737       for file_name in file_list:
738         node_is_mc = nodeinfo.master_candidate
739         must_have_file = file_name not in master_files
740         if file_name not in remote_cksum:
741           if node_is_mc or must_have_file:
742             bad = True
743             feedback_fn("  - ERROR: file '%s' missing" % file_name)
744         elif remote_cksum[file_name] != local_cksum[file_name]:
745           if node_is_mc or must_have_file:
746             bad = True
747             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
748           else:
749             # not candidate and this is not a must-have file
750             bad = True
751             feedback_fn("  - ERROR: file '%s' should not exist on non master"
752                         " candidates (and the file is outdated)" % file_name)
753         else:
754           # all good, except non-master/non-must have combination
755           if not node_is_mc and not must_have_file:
756             feedback_fn("  - ERROR: file '%s' should not exist on non master"
757                         " candidates" % file_name)
758
759     # checks ssh to any
760
761     if constants.NV_NODELIST not in node_result:
762       bad = True
763       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
764     else:
765       if node_result[constants.NV_NODELIST]:
766         bad = True
767         for node in node_result[constants.NV_NODELIST]:
768           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
769                           (node, node_result[constants.NV_NODELIST][node]))
770
771     if constants.NV_NODENETTEST not in node_result:
772       bad = True
773       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
774     else:
775       if node_result[constants.NV_NODENETTEST]:
776         bad = True
777         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
778         for node in nlist:
779           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
780                           (node, node_result[constants.NV_NODENETTEST][node]))
781
782     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
783     if isinstance(hyp_result, dict):
784       for hv_name, hv_result in hyp_result.iteritems():
785         if hv_result is not None:
786           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
787                       (hv_name, hv_result))
788
789     # check used drbd list
790     if vg_name is not None:
791       used_minors = node_result.get(constants.NV_DRBDLIST, [])
792       if not isinstance(used_minors, (tuple, list)):
793         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
794                     str(used_minors))
795       else:
796         for minor, (iname, must_exist) in drbd_map.items():
797           if minor not in used_minors and must_exist:
798             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
799                         " not active" % (minor, iname))
800             bad = True
801         for minor in used_minors:
802           if minor not in drbd_map:
803             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
804                         minor)
805             bad = True
806
807     return bad
808
809   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
810                       node_instance, feedback_fn, n_offline):
811     """Verify an instance.
812
813     This function checks to see if the required block devices are
814     available on the instance's node.
815
816     """
817     bad = False
818
819     node_current = instanceconfig.primary_node
820
821     node_vol_should = {}
822     instanceconfig.MapLVsByNode(node_vol_should)
823
824     for node in node_vol_should:
825       if node in n_offline:
826         # ignore missing volumes on offline nodes
827         continue
828       for volume in node_vol_should[node]:
829         if node not in node_vol_is or volume not in node_vol_is[node]:
830           feedback_fn("  - ERROR: volume %s missing on node %s" %
831                           (volume, node))
832           bad = True
833
834     if instanceconfig.admin_up:
835       if ((node_current not in node_instance or
836           not instance in node_instance[node_current]) and
837           node_current not in n_offline):
838         feedback_fn("  - ERROR: instance %s not running on node %s" %
839                         (instance, node_current))
840         bad = True
841
842     for node in node_instance:
843       if (not node == node_current):
844         if instance in node_instance[node]:
845           feedback_fn("  - ERROR: instance %s should not run on node %s" %
846                           (instance, node))
847           bad = True
848
849     return bad
850
851   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
852     """Verify if there are any unknown volumes in the cluster.
853
854     The .os, .swap and backup volumes are ignored. All other volumes are
855     reported as unknown.
856
857     """
858     bad = False
859
860     for node in node_vol_is:
861       for volume in node_vol_is[node]:
862         if node not in node_vol_should or volume not in node_vol_should[node]:
863           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
864                       (volume, node))
865           bad = True
866     return bad
867
868   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
869     """Verify the list of running instances.
870
871     This checks what instances are running but unknown to the cluster.
872
873     """
874     bad = False
875     for node in node_instance:
876       for runninginstance in node_instance[node]:
877         if runninginstance not in instancelist:
878           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
879                           (runninginstance, node))
880           bad = True
881     return bad
882
883   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
884     """Verify N+1 Memory Resilience.
885
886     Check that if one single node dies we can still start all the instances it
887     was primary for.
888
889     """
890     bad = False
891
892     for node, nodeinfo in node_info.iteritems():
893       # This code checks that every node which is now listed as secondary has
894       # enough memory to host all instances it is supposed to should a single
895       # other node in the cluster fail.
896       # FIXME: not ready for failover to an arbitrary node
897       # FIXME: does not support file-backed instances
898       # WARNING: we currently take into account down instances as well as up
899       # ones, considering that even if they're down someone might want to start
900       # them even in the event of a node failure.
901       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
902         needed_mem = 0
903         for instance in instances:
904           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
905           if bep[constants.BE_AUTO_BALANCE]:
906             needed_mem += bep[constants.BE_MEMORY]
907         if nodeinfo['mfree'] < needed_mem:
908           feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
909                       " failovers should node %s fail" % (node, prinode))
910           bad = True
911     return bad
912
913   def CheckPrereq(self):
914     """Check prerequisites.
915
916     Transform the list of checks we're going to skip into a set and check that
917     all its members are valid.
918
919     """
920     self.skip_set = frozenset(self.op.skip_checks)
921     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
922       raise errors.OpPrereqError("Invalid checks to be skipped specified")
923
924   def BuildHooksEnv(self):
925     """Build hooks env.
926
927     Cluster-Verify hooks just ran in the post phase and their failure makes
928     the output be logged in the verify output and the verification to fail.
929
930     """
931     all_nodes = self.cfg.GetNodeList()
932     env = {
933       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
934       }
935     for node in self.cfg.GetAllNodesInfo().values():
936       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
937
938     return env, [], all_nodes
939
940   def Exec(self, feedback_fn):
941     """Verify integrity of cluster, performing various test on nodes.
942
943     """
944     bad = False
945     feedback_fn("* Verifying global settings")
946     for msg in self.cfg.VerifyConfig():
947       feedback_fn("  - ERROR: %s" % msg)
948
949     vg_name = self.cfg.GetVGName()
950     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
951     nodelist = utils.NiceSort(self.cfg.GetNodeList())
952     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
953     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
954     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
955                         for iname in instancelist)
956     i_non_redundant = [] # Non redundant instances
957     i_non_a_balanced = [] # Non auto-balanced instances
958     n_offline = [] # List of offline nodes
959     n_drained = [] # List of nodes being drained
960     node_volume = {}
961     node_instance = {}
962     node_info = {}
963     instance_cfg = {}
964
965     # FIXME: verify OS list
966     # do local checksums
967     master_files = [constants.CLUSTER_CONF_FILE]
968
969     file_names = ssconf.SimpleStore().GetFileList()
970     file_names.append(constants.SSL_CERT_FILE)
971     file_names.append(constants.RAPI_CERT_FILE)
972     file_names.extend(master_files)
973
974     local_checksums = utils.FingerprintFiles(file_names)
975
976     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
977     node_verify_param = {
978       constants.NV_FILELIST: file_names,
979       constants.NV_NODELIST: [node.name for node in nodeinfo
980                               if not node.offline],
981       constants.NV_HYPERVISOR: hypervisors,
982       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
983                                   node.secondary_ip) for node in nodeinfo
984                                  if not node.offline],
985       constants.NV_INSTANCELIST: hypervisors,
986       constants.NV_VERSION: None,
987       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
988       }
989     if vg_name is not None:
990       node_verify_param[constants.NV_VGLIST] = None
991       node_verify_param[constants.NV_LVLIST] = vg_name
992       node_verify_param[constants.NV_DRBDLIST] = None
993     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
994                                            self.cfg.GetClusterName())
995
996     cluster = self.cfg.GetClusterInfo()
997     master_node = self.cfg.GetMasterNode()
998     all_drbd_map = self.cfg.ComputeDRBDMap()
999
1000     for node_i in nodeinfo:
1001       node = node_i.name
1002       nresult = all_nvinfo[node].data
1003
1004       if node_i.offline:
1005         feedback_fn("* Skipping offline node %s" % (node,))
1006         n_offline.append(node)
1007         continue
1008
1009       if node == master_node:
1010         ntype = "master"
1011       elif node_i.master_candidate:
1012         ntype = "master candidate"
1013       elif node_i.drained:
1014         ntype = "drained"
1015         n_drained.append(node)
1016       else:
1017         ntype = "regular"
1018       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1019
1020       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1021         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1022         bad = True
1023         continue
1024
1025       node_drbd = {}
1026       for minor, instance in all_drbd_map[node].items():
1027         if instance not in instanceinfo:
1028           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1029                       instance)
1030           # ghost instance should not be running, but otherwise we
1031           # don't give double warnings (both ghost instance and
1032           # unallocated minor in use)
1033           node_drbd[minor] = (instance, False)
1034         else:
1035           instance = instanceinfo[instance]
1036           node_drbd[minor] = (instance.name, instance.admin_up)
1037       result = self._VerifyNode(node_i, file_names, local_checksums,
1038                                 nresult, feedback_fn, master_files,
1039                                 node_drbd, vg_name)
1040       bad = bad or result
1041
1042       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1043       if vg_name is None:
1044         node_volume[node] = {}
1045       elif isinstance(lvdata, basestring):
1046         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1047                     (node, utils.SafeEncode(lvdata)))
1048         bad = True
1049         node_volume[node] = {}
1050       elif not isinstance(lvdata, dict):
1051         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1052         bad = True
1053         continue
1054       else:
1055         node_volume[node] = lvdata
1056
1057       # node_instance
1058       idata = nresult.get(constants.NV_INSTANCELIST, None)
1059       if not isinstance(idata, list):
1060         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1061                     (node,))
1062         bad = True
1063         continue
1064
1065       node_instance[node] = idata
1066
1067       # node_info
1068       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1069       if not isinstance(nodeinfo, dict):
1070         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1071         bad = True
1072         continue
1073
1074       try:
1075         node_info[node] = {
1076           "mfree": int(nodeinfo['memory_free']),
1077           "pinst": [],
1078           "sinst": [],
1079           # dictionary holding all instances this node is secondary for,
1080           # grouped by their primary node. Each key is a cluster node, and each
1081           # value is a list of instances which have the key as primary and the
1082           # current node as secondary.  this is handy to calculate N+1 memory
1083           # availability if you can only failover from a primary to its
1084           # secondary.
1085           "sinst-by-pnode": {},
1086         }
1087         # FIXME: devise a free space model for file based instances as well
1088         if vg_name is not None:
1089           if (constants.NV_VGLIST not in nresult or
1090               vg_name not in nresult[constants.NV_VGLIST]):
1091             feedback_fn("  - ERROR: node %s didn't return data for the"
1092                         " volume group '%s' - it is either missing or broken" %
1093                         (node, vg_name))
1094             bad = True
1095             continue
1096           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1097       except (ValueError, KeyError):
1098         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1099                     " from node %s" % (node,))
1100         bad = True
1101         continue
1102
1103     node_vol_should = {}
1104
1105     for instance in instancelist:
1106       feedback_fn("* Verifying instance %s" % instance)
1107       inst_config = instanceinfo[instance]
1108       result =  self._VerifyInstance(instance, inst_config, node_volume,
1109                                      node_instance, feedback_fn, n_offline)
1110       bad = bad or result
1111       inst_nodes_offline = []
1112
1113       inst_config.MapLVsByNode(node_vol_should)
1114
1115       instance_cfg[instance] = inst_config
1116
1117       pnode = inst_config.primary_node
1118       if pnode in node_info:
1119         node_info[pnode]['pinst'].append(instance)
1120       elif pnode not in n_offline:
1121         feedback_fn("  - ERROR: instance %s, connection to primary node"
1122                     " %s failed" % (instance, pnode))
1123         bad = True
1124
1125       if pnode in n_offline:
1126         inst_nodes_offline.append(pnode)
1127
1128       # If the instance is non-redundant we cannot survive losing its primary
1129       # node, so we are not N+1 compliant. On the other hand we have no disk
1130       # templates with more than one secondary so that situation is not well
1131       # supported either.
1132       # FIXME: does not support file-backed instances
1133       if len(inst_config.secondary_nodes) == 0:
1134         i_non_redundant.append(instance)
1135       elif len(inst_config.secondary_nodes) > 1:
1136         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1137                     % instance)
1138
1139       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1140         i_non_a_balanced.append(instance)
1141
1142       for snode in inst_config.secondary_nodes:
1143         if snode in node_info:
1144           node_info[snode]['sinst'].append(instance)
1145           if pnode not in node_info[snode]['sinst-by-pnode']:
1146             node_info[snode]['sinst-by-pnode'][pnode] = []
1147           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1148         elif snode not in n_offline:
1149           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1150                       " %s failed" % (instance, snode))
1151           bad = True
1152         if snode in n_offline:
1153           inst_nodes_offline.append(snode)
1154
1155       if inst_nodes_offline:
1156         # warn that the instance lives on offline nodes, and set bad=True
1157         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1158                     ", ".join(inst_nodes_offline))
1159         bad = True
1160
1161     feedback_fn("* Verifying orphan volumes")
1162     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1163                                        feedback_fn)
1164     bad = bad or result
1165
1166     feedback_fn("* Verifying remaining instances")
1167     result = self._VerifyOrphanInstances(instancelist, node_instance,
1168                                          feedback_fn)
1169     bad = bad or result
1170
1171     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1172       feedback_fn("* Verifying N+1 Memory redundancy")
1173       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1174       bad = bad or result
1175
1176     feedback_fn("* Other Notes")
1177     if i_non_redundant:
1178       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1179                   % len(i_non_redundant))
1180
1181     if i_non_a_balanced:
1182       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1183                   % len(i_non_a_balanced))
1184
1185     if n_offline:
1186       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1187
1188     if n_drained:
1189       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1190
1191     return not bad
1192
1193   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1194     """Analyze the post-hooks' result
1195
1196     This method analyses the hook result, handles it, and sends some
1197     nicely-formatted feedback back to the user.
1198
1199     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1200         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1201     @param hooks_results: the results of the multi-node hooks rpc call
1202     @param feedback_fn: function used send feedback back to the caller
1203     @param lu_result: previous Exec result
1204     @return: the new Exec result, based on the previous result
1205         and hook results
1206
1207     """
1208     # We only really run POST phase hooks, and are only interested in
1209     # their results
1210     if phase == constants.HOOKS_PHASE_POST:
1211       # Used to change hooks' output to proper indentation
1212       indent_re = re.compile('^', re.M)
1213       feedback_fn("* Hooks Results")
1214       if not hooks_results:
1215         feedback_fn("  - ERROR: general communication failure")
1216         lu_result = 1
1217       else:
1218         for node_name in hooks_results:
1219           show_node_header = True
1220           res = hooks_results[node_name]
1221           if res.failed or res.data is False or not isinstance(res.data, list):
1222             if res.offline:
1223               # no need to warn or set fail return value
1224               continue
1225             feedback_fn("    Communication failure in hooks execution")
1226             lu_result = 1
1227             continue
1228           for script, hkr, output in res.data:
1229             if hkr == constants.HKR_FAIL:
1230               # The node header is only shown once, if there are
1231               # failing hooks on that node
1232               if show_node_header:
1233                 feedback_fn("  Node %s:" % node_name)
1234                 show_node_header = False
1235               feedback_fn("    ERROR: Script %s failed, output:" % script)
1236               output = indent_re.sub('      ', output)
1237               feedback_fn("%s" % output)
1238               lu_result = 1
1239
1240       return lu_result
1241
1242
1243 class LUVerifyDisks(NoHooksLU):
1244   """Verifies the cluster disks status.
1245
1246   """
1247   _OP_REQP = []
1248   REQ_BGL = False
1249
1250   def ExpandNames(self):
1251     self.needed_locks = {
1252       locking.LEVEL_NODE: locking.ALL_SET,
1253       locking.LEVEL_INSTANCE: locking.ALL_SET,
1254     }
1255     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1256
1257   def CheckPrereq(self):
1258     """Check prerequisites.
1259
1260     This has no prerequisites.
1261
1262     """
1263     pass
1264
1265   def Exec(self, feedback_fn):
1266     """Verify integrity of cluster disks.
1267
1268     """
1269     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1270
1271     vg_name = self.cfg.GetVGName()
1272     nodes = utils.NiceSort(self.cfg.GetNodeList())
1273     instances = [self.cfg.GetInstanceInfo(name)
1274                  for name in self.cfg.GetInstanceList()]
1275
1276     nv_dict = {}
1277     for inst in instances:
1278       inst_lvs = {}
1279       if (not inst.admin_up or
1280           inst.disk_template not in constants.DTS_NET_MIRROR):
1281         continue
1282       inst.MapLVsByNode(inst_lvs)
1283       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1284       for node, vol_list in inst_lvs.iteritems():
1285         for vol in vol_list:
1286           nv_dict[(node, vol)] = inst
1287
1288     if not nv_dict:
1289       return result
1290
1291     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1292
1293     for node in nodes:
1294       # node_volume
1295       lvs = node_lvs[node]
1296       if lvs.failed:
1297         if not lvs.offline:
1298           self.LogWarning("Connection to node %s failed: %s" %
1299                           (node, lvs.data))
1300         continue
1301       lvs = lvs.data
1302       if isinstance(lvs, basestring):
1303         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1304         res_nlvm[node] = lvs
1305         continue
1306       elif not isinstance(lvs, dict):
1307         logging.warning("Connection to node %s failed or invalid data"
1308                         " returned", node)
1309         res_nodes.append(node)
1310         continue
1311
1312       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1313         inst = nv_dict.pop((node, lv_name), None)
1314         if (not lv_online and inst is not None
1315             and inst.name not in res_instances):
1316           res_instances.append(inst.name)
1317
1318     # any leftover items in nv_dict are missing LVs, let's arrange the
1319     # data better
1320     for key, inst in nv_dict.iteritems():
1321       if inst.name not in res_missing:
1322         res_missing[inst.name] = []
1323       res_missing[inst.name].append(key)
1324
1325     return result
1326
1327
1328 class LURepairDiskSizes(NoHooksLU):
1329   """Verifies the cluster disks sizes.
1330
1331   """
1332   _OP_REQP = ["instances"]
1333   REQ_BGL = False
1334
1335   def ExpandNames(self):
1336
1337     if not isinstance(self.op.instances, list):
1338       raise errors.OpPrereqError("Invalid argument type 'instances'")
1339
1340     if self.op.instances:
1341       self.wanted_names = []
1342       for name in self.op.instances:
1343         full_name = self.cfg.ExpandInstanceName(name)
1344         if full_name is None:
1345           raise errors.OpPrereqError("Instance '%s' not known" % name)
1346         self.wanted_names.append(full_name)
1347       self.needed_locks = {
1348         locking.LEVEL_NODE: [],
1349         locking.LEVEL_INSTANCE: self.wanted_names,
1350         }
1351       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1352     else:
1353       self.wanted_names = None
1354       self.needed_locks = {
1355         locking.LEVEL_NODE: locking.ALL_SET,
1356         locking.LEVEL_INSTANCE: locking.ALL_SET,
1357         }
1358     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1359
1360   def DeclareLocks(self, level):
1361     if level == locking.LEVEL_NODE and self.wanted_names is not None:
1362       self._LockInstancesNodes(primary_only=True)
1363
1364   def CheckPrereq(self):
1365     """Check prerequisites.
1366
1367     This only checks the optional instance list against the existing names.
1368
1369     """
1370     if self.wanted_names is None:
1371       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
1372
1373     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
1374                              in self.wanted_names]
1375
1376   def _EnsureChildSizes(self, disk):
1377     """Ensure children of the disk have the needed disk size.
1378
1379     This is valid mainly for DRBD8 and fixes an issue where the
1380     children have smaller disk size.
1381
1382     @param disk: an L{ganeti.objects.Disk} object
1383
1384     """
1385     if disk.dev_type == constants.LD_DRBD8:
1386       assert disk.children, "Empty children for DRBD8?"
1387       fchild = disk.children[0]
1388       mismatch = fchild.size < disk.size
1389       if mismatch:
1390         self.LogInfo("Child disk has size %d, parent %d, fixing",
1391                      fchild.size, disk.size)
1392         fchild.size = disk.size
1393
1394       # and we recurse on this child only, not on the metadev
1395       return self._EnsureChildSizes(fchild) or mismatch
1396     else:
1397       return False
1398
1399   def Exec(self, feedback_fn):
1400     """Verify the size of cluster disks.
1401
1402     """
1403     # TODO: check child disks too
1404     # TODO: check differences in size between primary/secondary nodes
1405     per_node_disks = {}
1406     for instance in self.wanted_instances:
1407       pnode = instance.primary_node
1408       if pnode not in per_node_disks:
1409         per_node_disks[pnode] = []
1410       for idx, disk in enumerate(instance.disks):
1411         per_node_disks[pnode].append((instance, idx, disk))
1412
1413     changed = []
1414     for node, dskl in per_node_disks.items():
1415       newl = [v[2].Copy() for v in dskl]
1416       for dsk in newl:
1417         self.cfg.SetDiskID(dsk, node)
1418       result = self.rpc.call_blockdev_getsizes(node, newl)
1419       if result.failed:
1420         self.LogWarning("Failure in blockdev_getsizes call to node"
1421                         " %s, ignoring", node)
1422         continue
1423       if len(result.data) != len(dskl):
1424         self.LogWarning("Invalid result from node %s, ignoring node results",
1425                         node)
1426         continue
1427       for ((instance, idx, disk), size) in zip(dskl, result.data):
1428         if size is None:
1429           self.LogWarning("Disk %d of instance %s did not return size"
1430                           " information, ignoring", idx, instance.name)
1431           continue
1432         if not isinstance(size, (int, long)):
1433           self.LogWarning("Disk %d of instance %s did not return valid"
1434                           " size information, ignoring", idx, instance.name)
1435           continue
1436         size = size >> 20
1437         if size != disk.size:
1438           self.LogInfo("Disk %d of instance %s has mismatched size,"
1439                        " correcting: recorded %d, actual %d", idx,
1440                        instance.name, disk.size, size)
1441           disk.size = size
1442           self.cfg.Update(instance)
1443           changed.append((instance.name, idx, size))
1444         if self._EnsureChildSizes(disk):
1445           self.cfg.Update(instance)
1446           changed.append((instance.name, idx, disk.size))
1447     return changed
1448
1449
1450 class LURenameCluster(LogicalUnit):
1451   """Rename the cluster.
1452
1453   """
1454   HPATH = "cluster-rename"
1455   HTYPE = constants.HTYPE_CLUSTER
1456   _OP_REQP = ["name"]
1457
1458   def BuildHooksEnv(self):
1459     """Build hooks env.
1460
1461     """
1462     env = {
1463       "OP_TARGET": self.cfg.GetClusterName(),
1464       "NEW_NAME": self.op.name,
1465       }
1466     mn = self.cfg.GetMasterNode()
1467     return env, [mn], [mn]
1468
1469   def CheckPrereq(self):
1470     """Verify that the passed name is a valid one.
1471
1472     """
1473     hostname = utils.HostInfo(self.op.name)
1474
1475     new_name = hostname.name
1476     self.ip = new_ip = hostname.ip
1477     old_name = self.cfg.GetClusterName()
1478     old_ip = self.cfg.GetMasterIP()
1479     if new_name == old_name and new_ip == old_ip:
1480       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1481                                  " cluster has changed")
1482     if new_ip != old_ip:
1483       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1484         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1485                                    " reachable on the network. Aborting." %
1486                                    new_ip)
1487
1488     self.op.name = new_name
1489
1490   def Exec(self, feedback_fn):
1491     """Rename the cluster.
1492
1493     """
1494     clustername = self.op.name
1495     ip = self.ip
1496
1497     # shutdown the master IP
1498     master = self.cfg.GetMasterNode()
1499     result = self.rpc.call_node_stop_master(master, False)
1500     if result.failed or not result.data:
1501       raise errors.OpExecError("Could not disable the master role")
1502
1503     try:
1504       cluster = self.cfg.GetClusterInfo()
1505       cluster.cluster_name = clustername
1506       cluster.master_ip = ip
1507       self.cfg.Update(cluster)
1508
1509       # update the known hosts file
1510       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1511       node_list = self.cfg.GetNodeList()
1512       try:
1513         node_list.remove(master)
1514       except ValueError:
1515         pass
1516       result = self.rpc.call_upload_file(node_list,
1517                                          constants.SSH_KNOWN_HOSTS_FILE)
1518       for to_node, to_result in result.iteritems():
1519         if to_result.failed or not to_result.data:
1520           logging.error("Copy of file %s to node %s failed",
1521                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
1522
1523     finally:
1524       result = self.rpc.call_node_start_master(master, False, False)
1525       if result.failed or not result.data:
1526         self.LogWarning("Could not re-enable the master role on"
1527                         " the master, please restart manually.")
1528
1529
1530 def _RecursiveCheckIfLVMBased(disk):
1531   """Check if the given disk or its children are lvm-based.
1532
1533   @type disk: L{objects.Disk}
1534   @param disk: the disk to check
1535   @rtype: boolean
1536   @return: boolean indicating whether a LD_LV dev_type was found or not
1537
1538   """
1539   if disk.children:
1540     for chdisk in disk.children:
1541       if _RecursiveCheckIfLVMBased(chdisk):
1542         return True
1543   return disk.dev_type == constants.LD_LV
1544
1545
1546 class LUSetClusterParams(LogicalUnit):
1547   """Change the parameters of the cluster.
1548
1549   """
1550   HPATH = "cluster-modify"
1551   HTYPE = constants.HTYPE_CLUSTER
1552   _OP_REQP = []
1553   REQ_BGL = False
1554
1555   def CheckArguments(self):
1556     """Check parameters
1557
1558     """
1559     if not hasattr(self.op, "candidate_pool_size"):
1560       self.op.candidate_pool_size = None
1561     if self.op.candidate_pool_size is not None:
1562       try:
1563         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1564       except (ValueError, TypeError), err:
1565         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1566                                    str(err))
1567       if self.op.candidate_pool_size < 1:
1568         raise errors.OpPrereqError("At least one master candidate needed")
1569
1570   def ExpandNames(self):
1571     # FIXME: in the future maybe other cluster params won't require checking on
1572     # all nodes to be modified.
1573     self.needed_locks = {
1574       locking.LEVEL_NODE: locking.ALL_SET,
1575     }
1576     self.share_locks[locking.LEVEL_NODE] = 1
1577
1578   def BuildHooksEnv(self):
1579     """Build hooks env.
1580
1581     """
1582     env = {
1583       "OP_TARGET": self.cfg.GetClusterName(),
1584       "NEW_VG_NAME": self.op.vg_name,
1585       }
1586     mn = self.cfg.GetMasterNode()
1587     return env, [mn], [mn]
1588
1589   def CheckPrereq(self):
1590     """Check prerequisites.
1591
1592     This checks whether the given params don't conflict and
1593     if the given volume group is valid.
1594
1595     """
1596     if self.op.vg_name is not None and not self.op.vg_name:
1597       instances = self.cfg.GetAllInstancesInfo().values()
1598       for inst in instances:
1599         for disk in inst.disks:
1600           if _RecursiveCheckIfLVMBased(disk):
1601             raise errors.OpPrereqError("Cannot disable lvm storage while"
1602                                        " lvm-based instances exist")
1603
1604     node_list = self.acquired_locks[locking.LEVEL_NODE]
1605
1606     # if vg_name not None, checks given volume group on all nodes
1607     if self.op.vg_name:
1608       vglist = self.rpc.call_vg_list(node_list)
1609       for node in node_list:
1610         if vglist[node].failed:
1611           # ignoring down node
1612           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1613           continue
1614         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1615                                               self.op.vg_name,
1616                                               constants.MIN_VG_SIZE)
1617         if vgstatus:
1618           raise errors.OpPrereqError("Error on node '%s': %s" %
1619                                      (node, vgstatus))
1620
1621     self.cluster = cluster = self.cfg.GetClusterInfo()
1622     # validate beparams changes
1623     if self.op.beparams:
1624       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1625       self.new_beparams = cluster.FillDict(
1626         cluster.beparams[constants.BEGR_DEFAULT], self.op.beparams)
1627
1628     # hypervisor list/parameters
1629     self.new_hvparams = cluster.FillDict(cluster.hvparams, {})
1630     if self.op.hvparams:
1631       if not isinstance(self.op.hvparams, dict):
1632         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1633       for hv_name, hv_dict in self.op.hvparams.items():
1634         if hv_name not in self.new_hvparams:
1635           self.new_hvparams[hv_name] = hv_dict
1636         else:
1637           self.new_hvparams[hv_name].update(hv_dict)
1638
1639     if self.op.enabled_hypervisors is not None:
1640       self.hv_list = self.op.enabled_hypervisors
1641       if not self.hv_list:
1642         raise errors.OpPrereqError("Enabled hypervisors list must contain at"
1643                                    " least one member")
1644       invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
1645       if invalid_hvs:
1646         raise errors.OpPrereqError("Enabled hypervisors contains invalid"
1647                                    " entries: %s" % invalid_hvs)
1648     else:
1649       self.hv_list = cluster.enabled_hypervisors
1650
1651     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1652       # either the enabled list has changed, or the parameters have, validate
1653       for hv_name, hv_params in self.new_hvparams.items():
1654         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1655             (self.op.enabled_hypervisors and
1656              hv_name in self.op.enabled_hypervisors)):
1657           # either this is a new hypervisor, or its parameters have changed
1658           hv_class = hypervisor.GetHypervisor(hv_name)
1659           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1660           hv_class.CheckParameterSyntax(hv_params)
1661           _CheckHVParams(self, node_list, hv_name, hv_params)
1662
1663   def Exec(self, feedback_fn):
1664     """Change the parameters of the cluster.
1665
1666     """
1667     if self.op.vg_name is not None:
1668       new_volume = self.op.vg_name
1669       if not new_volume:
1670         new_volume = None
1671       if new_volume != self.cfg.GetVGName():
1672         self.cfg.SetVGName(new_volume)
1673       else:
1674         feedback_fn("Cluster LVM configuration already in desired"
1675                     " state, not changing")
1676     if self.op.hvparams:
1677       self.cluster.hvparams = self.new_hvparams
1678     if self.op.enabled_hypervisors is not None:
1679       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1680     if self.op.beparams:
1681       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
1682     if self.op.candidate_pool_size is not None:
1683       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1684       # we need to update the pool size here, otherwise the save will fail
1685       _AdjustCandidatePool(self)
1686
1687     self.cfg.Update(self.cluster)
1688
1689
1690 class LURedistributeConfig(NoHooksLU):
1691   """Force the redistribution of cluster configuration.
1692
1693   This is a very simple LU.
1694
1695   """
1696   _OP_REQP = []
1697   REQ_BGL = False
1698
1699   def ExpandNames(self):
1700     self.needed_locks = {
1701       locking.LEVEL_NODE: locking.ALL_SET,
1702     }
1703     self.share_locks[locking.LEVEL_NODE] = 1
1704
1705   def CheckPrereq(self):
1706     """Check prerequisites.
1707
1708     """
1709
1710   def Exec(self, feedback_fn):
1711     """Redistribute the configuration.
1712
1713     """
1714     self.cfg.Update(self.cfg.GetClusterInfo())
1715
1716
1717 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1718   """Sleep and poll for an instance's disk to sync.
1719
1720   """
1721   if not instance.disks:
1722     return True
1723
1724   if not oneshot:
1725     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1726
1727   node = instance.primary_node
1728
1729   for dev in instance.disks:
1730     lu.cfg.SetDiskID(dev, node)
1731
1732   retries = 0
1733   degr_retries = 10 # in seconds, as we sleep 1 second each time
1734   while True:
1735     max_time = 0
1736     done = True
1737     cumul_degraded = False
1738     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1739     if rstats.failed or not rstats.data:
1740       lu.LogWarning("Can't get any data from node %s", node)
1741       retries += 1
1742       if retries >= 10:
1743         raise errors.RemoteError("Can't contact node %s for mirror data,"
1744                                  " aborting." % node)
1745       time.sleep(6)
1746       continue
1747     rstats = rstats.data
1748     retries = 0
1749     for i, mstat in enumerate(rstats):
1750       if mstat is None:
1751         lu.LogWarning("Can't compute data for node %s/%s",
1752                            node, instance.disks[i].iv_name)
1753         continue
1754       # we ignore the ldisk parameter
1755       perc_done, est_time, is_degraded, _ = mstat
1756       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1757       if perc_done is not None:
1758         done = False
1759         if est_time is not None:
1760           rem_time = "%d estimated seconds remaining" % est_time
1761           max_time = est_time
1762         else:
1763           rem_time = "no time estimate"
1764         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1765                         (instance.disks[i].iv_name, perc_done, rem_time))
1766
1767     # if we're done but degraded, let's do a few small retries, to
1768     # make sure we see a stable and not transient situation; therefore
1769     # we force restart of the loop
1770     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1771       logging.info("Degraded disks found, %d retries left", degr_retries)
1772       degr_retries -= 1
1773       time.sleep(1)
1774       continue
1775
1776     if done or oneshot:
1777       break
1778
1779     time.sleep(min(60, max_time))
1780
1781   if done:
1782     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1783   return not cumul_degraded
1784
1785
1786 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1787   """Check that mirrors are not degraded.
1788
1789   The ldisk parameter, if True, will change the test from the
1790   is_degraded attribute (which represents overall non-ok status for
1791   the device(s)) to the ldisk (representing the local storage status).
1792
1793   """
1794   lu.cfg.SetDiskID(dev, node)
1795   if ldisk:
1796     idx = 6
1797   else:
1798     idx = 5
1799
1800   result = True
1801   if on_primary or dev.AssembleOnSecondary():
1802     rstats = lu.rpc.call_blockdev_find(node, dev)
1803     msg = rstats.RemoteFailMsg()
1804     if msg:
1805       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1806       result = False
1807     elif not rstats.payload:
1808       lu.LogWarning("Can't find disk on node %s", node)
1809       result = False
1810     else:
1811       result = result and (not rstats.payload[idx])
1812   if dev.children:
1813     for child in dev.children:
1814       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1815
1816   return result
1817
1818
1819 class LUDiagnoseOS(NoHooksLU):
1820   """Logical unit for OS diagnose/query.
1821
1822   """
1823   _OP_REQP = ["output_fields", "names"]
1824   REQ_BGL = False
1825   _FIELDS_STATIC = utils.FieldSet()
1826   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1827
1828   def ExpandNames(self):
1829     if self.op.names:
1830       raise errors.OpPrereqError("Selective OS query not supported")
1831
1832     _CheckOutputFields(static=self._FIELDS_STATIC,
1833                        dynamic=self._FIELDS_DYNAMIC,
1834                        selected=self.op.output_fields)
1835
1836     # Lock all nodes, in shared mode
1837     # Temporary removal of locks, should be reverted later
1838     # TODO: reintroduce locks when they are lighter-weight
1839     self.needed_locks = {}
1840     #self.share_locks[locking.LEVEL_NODE] = 1
1841     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1842
1843   def CheckPrereq(self):
1844     """Check prerequisites.
1845
1846     """
1847
1848   @staticmethod
1849   def _DiagnoseByOS(node_list, rlist):
1850     """Remaps a per-node return list into an a per-os per-node dictionary
1851
1852     @param node_list: a list with the names of all nodes
1853     @param rlist: a map with node names as keys and OS objects as values
1854
1855     @rtype: dict
1856     @return: a dictionary with osnames as keys and as value another map, with
1857         nodes as keys and list of OS objects as values, eg::
1858
1859           {"debian-etch": {"node1": [<object>,...],
1860                            "node2": [<object>,]}
1861           }
1862
1863     """
1864     all_os = {}
1865     # we build here the list of nodes that didn't fail the RPC (at RPC
1866     # level), so that nodes with a non-responding node daemon don't
1867     # make all OSes invalid
1868     good_nodes = [node_name for node_name in rlist
1869                   if not rlist[node_name].failed]
1870     for node_name, nr in rlist.iteritems():
1871       if nr.failed or not nr.data:
1872         continue
1873       for os_obj in nr.data:
1874         if os_obj.name not in all_os:
1875           # build a list of nodes for this os containing empty lists
1876           # for each node in node_list
1877           all_os[os_obj.name] = {}
1878           for nname in good_nodes:
1879             all_os[os_obj.name][nname] = []
1880         all_os[os_obj.name][node_name].append(os_obj)
1881     return all_os
1882
1883   def Exec(self, feedback_fn):
1884     """Compute the list of OSes.
1885
1886     """
1887     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1888     node_data = self.rpc.call_os_diagnose(valid_nodes)
1889     if node_data == False:
1890       raise errors.OpExecError("Can't gather the list of OSes")
1891     pol = self._DiagnoseByOS(valid_nodes, node_data)
1892     output = []
1893     for os_name, os_data in pol.iteritems():
1894       row = []
1895       for field in self.op.output_fields:
1896         if field == "name":
1897           val = os_name
1898         elif field == "valid":
1899           val = utils.all([osl and osl[0] for osl in os_data.values()])
1900         elif field == "node_status":
1901           val = {}
1902           for node_name, nos_list in os_data.iteritems():
1903             val[node_name] = [(v.status, v.path) for v in nos_list]
1904         else:
1905           raise errors.ParameterError(field)
1906         row.append(val)
1907       output.append(row)
1908
1909     return output
1910
1911
1912 class LURemoveNode(LogicalUnit):
1913   """Logical unit for removing a node.
1914
1915   """
1916   HPATH = "node-remove"
1917   HTYPE = constants.HTYPE_NODE
1918   _OP_REQP = ["node_name"]
1919
1920   def BuildHooksEnv(self):
1921     """Build hooks env.
1922
1923     This doesn't run on the target node in the pre phase as a failed
1924     node would then be impossible to remove.
1925
1926     """
1927     env = {
1928       "OP_TARGET": self.op.node_name,
1929       "NODE_NAME": self.op.node_name,
1930       }
1931     all_nodes = self.cfg.GetNodeList()
1932     all_nodes.remove(self.op.node_name)
1933     return env, all_nodes, all_nodes
1934
1935   def CheckPrereq(self):
1936     """Check prerequisites.
1937
1938     This checks:
1939      - the node exists in the configuration
1940      - it does not have primary or secondary instances
1941      - it's not the master
1942
1943     Any errors are signaled by raising errors.OpPrereqError.
1944
1945     """
1946     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1947     if node is None:
1948       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1949
1950     instance_list = self.cfg.GetInstanceList()
1951
1952     masternode = self.cfg.GetMasterNode()
1953     if node.name == masternode:
1954       raise errors.OpPrereqError("Node is the master node,"
1955                                  " you need to failover first.")
1956
1957     for instance_name in instance_list:
1958       instance = self.cfg.GetInstanceInfo(instance_name)
1959       if node.name in instance.all_nodes:
1960         raise errors.OpPrereqError("Instance %s is still running on the node,"
1961                                    " please remove first." % instance_name)
1962     self.op.node_name = node.name
1963     self.node = node
1964
1965   def Exec(self, feedback_fn):
1966     """Removes the node from the cluster.
1967
1968     """
1969     node = self.node
1970     logging.info("Stopping the node daemon and removing configs from node %s",
1971                  node.name)
1972
1973     self.context.RemoveNode(node.name)
1974
1975     self.rpc.call_node_leave_cluster(node.name)
1976
1977     # Promote nodes to master candidate as needed
1978     _AdjustCandidatePool(self)
1979
1980
1981 class LUQueryNodes(NoHooksLU):
1982   """Logical unit for querying nodes.
1983
1984   """
1985   _OP_REQP = ["output_fields", "names", "use_locking"]
1986   REQ_BGL = False
1987   _FIELDS_DYNAMIC = utils.FieldSet(
1988     "dtotal", "dfree",
1989     "mtotal", "mnode", "mfree",
1990     "bootid",
1991     "ctotal", "cnodes", "csockets",
1992     )
1993
1994   _FIELDS_STATIC = utils.FieldSet(
1995     "name", "pinst_cnt", "sinst_cnt",
1996     "pinst_list", "sinst_list",
1997     "pip", "sip", "tags",
1998     "serial_no",
1999     "master_candidate",
2000     "master",
2001     "offline",
2002     "drained",
2003     "role",
2004     )
2005
2006   def ExpandNames(self):
2007     _CheckOutputFields(static=self._FIELDS_STATIC,
2008                        dynamic=self._FIELDS_DYNAMIC,
2009                        selected=self.op.output_fields)
2010
2011     self.needed_locks = {}
2012     self.share_locks[locking.LEVEL_NODE] = 1
2013
2014     if self.op.names:
2015       self.wanted = _GetWantedNodes(self, self.op.names)
2016     else:
2017       self.wanted = locking.ALL_SET
2018
2019     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
2020     self.do_locking = self.do_node_query and self.op.use_locking
2021     if self.do_locking:
2022       # if we don't request only static fields, we need to lock the nodes
2023       self.needed_locks[locking.LEVEL_NODE] = self.wanted
2024
2025
2026   def CheckPrereq(self):
2027     """Check prerequisites.
2028
2029     """
2030     # The validation of the node list is done in the _GetWantedNodes,
2031     # if non empty, and if empty, there's no validation to do
2032     pass
2033
2034   def Exec(self, feedback_fn):
2035     """Computes the list of nodes and their attributes.
2036
2037     """
2038     all_info = self.cfg.GetAllNodesInfo()
2039     if self.do_locking:
2040       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2041     elif self.wanted != locking.ALL_SET:
2042       nodenames = self.wanted
2043       missing = set(nodenames).difference(all_info.keys())
2044       if missing:
2045         raise errors.OpExecError(
2046           "Some nodes were removed before retrieving their data: %s" % missing)
2047     else:
2048       nodenames = all_info.keys()
2049
2050     nodenames = utils.NiceSort(nodenames)
2051     nodelist = [all_info[name] for name in nodenames]
2052
2053     # begin data gathering
2054
2055     if self.do_node_query:
2056       live_data = {}
2057       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2058                                           self.cfg.GetHypervisorType())
2059       for name in nodenames:
2060         nodeinfo = node_data[name]
2061         if not nodeinfo.failed and nodeinfo.data:
2062           nodeinfo = nodeinfo.data
2063           fn = utils.TryConvert
2064           live_data[name] = {
2065             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2066             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2067             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2068             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2069             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2070             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2071             "bootid": nodeinfo.get('bootid', None),
2072             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2073             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2074             }
2075         else:
2076           live_data[name] = {}
2077     else:
2078       live_data = dict.fromkeys(nodenames, {})
2079
2080     node_to_primary = dict([(name, set()) for name in nodenames])
2081     node_to_secondary = dict([(name, set()) for name in nodenames])
2082
2083     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2084                              "sinst_cnt", "sinst_list"))
2085     if inst_fields & frozenset(self.op.output_fields):
2086       instancelist = self.cfg.GetInstanceList()
2087
2088       for instance_name in instancelist:
2089         inst = self.cfg.GetInstanceInfo(instance_name)
2090         if inst.primary_node in node_to_primary:
2091           node_to_primary[inst.primary_node].add(inst.name)
2092         for secnode in inst.secondary_nodes:
2093           if secnode in node_to_secondary:
2094             node_to_secondary[secnode].add(inst.name)
2095
2096     master_node = self.cfg.GetMasterNode()
2097
2098     # end data gathering
2099
2100     output = []
2101     for node in nodelist:
2102       node_output = []
2103       for field in self.op.output_fields:
2104         if field == "name":
2105           val = node.name
2106         elif field == "pinst_list":
2107           val = list(node_to_primary[node.name])
2108         elif field == "sinst_list":
2109           val = list(node_to_secondary[node.name])
2110         elif field == "pinst_cnt":
2111           val = len(node_to_primary[node.name])
2112         elif field == "sinst_cnt":
2113           val = len(node_to_secondary[node.name])
2114         elif field == "pip":
2115           val = node.primary_ip
2116         elif field == "sip":
2117           val = node.secondary_ip
2118         elif field == "tags":
2119           val = list(node.GetTags())
2120         elif field == "serial_no":
2121           val = node.serial_no
2122         elif field == "master_candidate":
2123           val = node.master_candidate
2124         elif field == "master":
2125           val = node.name == master_node
2126         elif field == "offline":
2127           val = node.offline
2128         elif field == "drained":
2129           val = node.drained
2130         elif self._FIELDS_DYNAMIC.Matches(field):
2131           val = live_data[node.name].get(field, None)
2132         elif field == "role":
2133           if node.name == master_node:
2134             val = "M"
2135           elif node.master_candidate:
2136             val = "C"
2137           elif node.drained:
2138             val = "D"
2139           elif node.offline:
2140             val = "O"
2141           else:
2142             val = "R"
2143         else:
2144           raise errors.ParameterError(field)
2145         node_output.append(val)
2146       output.append(node_output)
2147
2148     return output
2149
2150
2151 class LUQueryNodeVolumes(NoHooksLU):
2152   """Logical unit for getting volumes on node(s).
2153
2154   """
2155   _OP_REQP = ["nodes", "output_fields"]
2156   REQ_BGL = False
2157   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2158   _FIELDS_STATIC = utils.FieldSet("node")
2159
2160   def ExpandNames(self):
2161     _CheckOutputFields(static=self._FIELDS_STATIC,
2162                        dynamic=self._FIELDS_DYNAMIC,
2163                        selected=self.op.output_fields)
2164
2165     self.needed_locks = {}
2166     self.share_locks[locking.LEVEL_NODE] = 1
2167     if not self.op.nodes:
2168       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2169     else:
2170       self.needed_locks[locking.LEVEL_NODE] = \
2171         _GetWantedNodes(self, self.op.nodes)
2172
2173   def CheckPrereq(self):
2174     """Check prerequisites.
2175
2176     This checks that the fields required are valid output fields.
2177
2178     """
2179     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2180
2181   def Exec(self, feedback_fn):
2182     """Computes the list of nodes and their attributes.
2183
2184     """
2185     nodenames = self.nodes
2186     volumes = self.rpc.call_node_volumes(nodenames)
2187
2188     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2189              in self.cfg.GetInstanceList()]
2190
2191     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2192
2193     output = []
2194     for node in nodenames:
2195       if node not in volumes or volumes[node].failed or not volumes[node].data:
2196         continue
2197
2198       node_vols = volumes[node].data[:]
2199       node_vols.sort(key=lambda vol: vol['dev'])
2200
2201       for vol in node_vols:
2202         node_output = []
2203         for field in self.op.output_fields:
2204           if field == "node":
2205             val = node
2206           elif field == "phys":
2207             val = vol['dev']
2208           elif field == "vg":
2209             val = vol['vg']
2210           elif field == "name":
2211             val = vol['name']
2212           elif field == "size":
2213             val = int(float(vol['size']))
2214           elif field == "instance":
2215             for inst in ilist:
2216               if node not in lv_by_node[inst]:
2217                 continue
2218               if vol['name'] in lv_by_node[inst][node]:
2219                 val = inst.name
2220                 break
2221             else:
2222               val = '-'
2223           else:
2224             raise errors.ParameterError(field)
2225           node_output.append(str(val))
2226
2227         output.append(node_output)
2228
2229     return output
2230
2231
2232 class LUAddNode(LogicalUnit):
2233   """Logical unit for adding node to the cluster.
2234
2235   """
2236   HPATH = "node-add"
2237   HTYPE = constants.HTYPE_NODE
2238   _OP_REQP = ["node_name"]
2239
2240   def BuildHooksEnv(self):
2241     """Build hooks env.
2242
2243     This will run on all nodes before, and on all nodes + the new node after.
2244
2245     """
2246     env = {
2247       "OP_TARGET": self.op.node_name,
2248       "NODE_NAME": self.op.node_name,
2249       "NODE_PIP": self.op.primary_ip,
2250       "NODE_SIP": self.op.secondary_ip,
2251       }
2252     nodes_0 = self.cfg.GetNodeList()
2253     nodes_1 = nodes_0 + [self.op.node_name, ]
2254     return env, nodes_0, nodes_1
2255
2256   def CheckPrereq(self):
2257     """Check prerequisites.
2258
2259     This checks:
2260      - the new node is not already in the config
2261      - it is resolvable
2262      - its parameters (single/dual homed) matches the cluster
2263
2264     Any errors are signaled by raising errors.OpPrereqError.
2265
2266     """
2267     node_name = self.op.node_name
2268     cfg = self.cfg
2269
2270     dns_data = utils.HostInfo(node_name)
2271
2272     node = dns_data.name
2273     primary_ip = self.op.primary_ip = dns_data.ip
2274     secondary_ip = getattr(self.op, "secondary_ip", None)
2275     if secondary_ip is None:
2276       secondary_ip = primary_ip
2277     if not utils.IsValidIP(secondary_ip):
2278       raise errors.OpPrereqError("Invalid secondary IP given")
2279     self.op.secondary_ip = secondary_ip
2280
2281     node_list = cfg.GetNodeList()
2282     if not self.op.readd and node in node_list:
2283       raise errors.OpPrereqError("Node %s is already in the configuration" %
2284                                  node)
2285     elif self.op.readd and node not in node_list:
2286       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2287
2288     for existing_node_name in node_list:
2289       existing_node = cfg.GetNodeInfo(existing_node_name)
2290
2291       if self.op.readd and node == existing_node_name:
2292         if (existing_node.primary_ip != primary_ip or
2293             existing_node.secondary_ip != secondary_ip):
2294           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2295                                      " address configuration as before")
2296         continue
2297
2298       if (existing_node.primary_ip == primary_ip or
2299           existing_node.secondary_ip == primary_ip or
2300           existing_node.primary_ip == secondary_ip or
2301           existing_node.secondary_ip == secondary_ip):
2302         raise errors.OpPrereqError("New node ip address(es) conflict with"
2303                                    " existing node %s" % existing_node.name)
2304
2305     # check that the type of the node (single versus dual homed) is the
2306     # same as for the master
2307     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2308     master_singlehomed = myself.secondary_ip == myself.primary_ip
2309     newbie_singlehomed = secondary_ip == primary_ip
2310     if master_singlehomed != newbie_singlehomed:
2311       if master_singlehomed:
2312         raise errors.OpPrereqError("The master has no private ip but the"
2313                                    " new node has one")
2314       else:
2315         raise errors.OpPrereqError("The master has a private ip but the"
2316                                    " new node doesn't have one")
2317
2318     # checks reachability
2319     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2320       raise errors.OpPrereqError("Node not reachable by ping")
2321
2322     if not newbie_singlehomed:
2323       # check reachability from my secondary ip to newbie's secondary ip
2324       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2325                            source=myself.secondary_ip):
2326         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2327                                    " based ping to noded port")
2328
2329     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2330     if self.op.readd:
2331       exceptions = [node]
2332     else:
2333       exceptions = []
2334     mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
2335     # the new node will increase mc_max with one, so:
2336     mc_max = min(mc_max + 1, cp_size)
2337     self.master_candidate = mc_now < mc_max
2338
2339     if self.op.readd:
2340       self.new_node = self.cfg.GetNodeInfo(node)
2341       assert self.new_node is not None, "Can't retrieve locked node %s" % node
2342     else:
2343       self.new_node = objects.Node(name=node,
2344                                    primary_ip=primary_ip,
2345                                    secondary_ip=secondary_ip,
2346                                    master_candidate=self.master_candidate,
2347                                    offline=False, drained=False)
2348
2349   def Exec(self, feedback_fn):
2350     """Adds the new node to the cluster.
2351
2352     """
2353     new_node = self.new_node
2354     node = new_node.name
2355
2356     # for re-adds, reset the offline/drained/master-candidate flags;
2357     # we need to reset here, otherwise offline would prevent RPC calls
2358     # later in the procedure; this also means that if the re-add
2359     # fails, we are left with a non-offlined, broken node
2360     if self.op.readd:
2361       new_node.drained = new_node.offline = False
2362       self.LogInfo("Readding a node, the offline/drained flags were reset")
2363       # if we demote the node, we do cleanup later in the procedure
2364       new_node.master_candidate = self.master_candidate
2365
2366     # notify the user about any possible mc promotion
2367     if new_node.master_candidate:
2368       self.LogInfo("Node will be a master candidate")
2369
2370     # check connectivity
2371     result = self.rpc.call_version([node])[node]
2372     result.Raise()
2373     if result.data:
2374       if constants.PROTOCOL_VERSION == result.data:
2375         logging.info("Communication to node %s fine, sw version %s match",
2376                      node, result.data)
2377       else:
2378         raise errors.OpExecError("Version mismatch master version %s,"
2379                                  " node version %s" %
2380                                  (constants.PROTOCOL_VERSION, result.data))
2381     else:
2382       raise errors.OpExecError("Cannot get version from the new node")
2383
2384     # setup ssh on node
2385     logging.info("Copy ssh key to node %s", node)
2386     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2387     keyarray = []
2388     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2389                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2390                 priv_key, pub_key]
2391
2392     for i in keyfiles:
2393       f = open(i, 'r')
2394       try:
2395         keyarray.append(f.read())
2396       finally:
2397         f.close()
2398
2399     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2400                                     keyarray[2],
2401                                     keyarray[3], keyarray[4], keyarray[5])
2402
2403     msg = result.RemoteFailMsg()
2404     if msg:
2405       raise errors.OpExecError("Cannot transfer ssh keys to the"
2406                                " new node: %s" % msg)
2407
2408     # Add node to our /etc/hosts, and add key to known_hosts
2409     if self.cfg.GetClusterInfo().modify_etc_hosts:
2410       utils.AddHostToEtcHosts(new_node.name)
2411
2412     if new_node.secondary_ip != new_node.primary_ip:
2413       result = self.rpc.call_node_has_ip_address(new_node.name,
2414                                                  new_node.secondary_ip)
2415       if result.failed or not result.data:
2416         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2417                                  " you gave (%s). Please fix and re-run this"
2418                                  " command." % new_node.secondary_ip)
2419
2420     node_verify_list = [self.cfg.GetMasterNode()]
2421     node_verify_param = {
2422       'nodelist': [node],
2423       # TODO: do a node-net-test as well?
2424     }
2425
2426     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2427                                        self.cfg.GetClusterName())
2428     for verifier in node_verify_list:
2429       if result[verifier].failed or not result[verifier].data:
2430         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2431                                  " for remote verification" % verifier)
2432       if result[verifier].data['nodelist']:
2433         for failed in result[verifier].data['nodelist']:
2434           feedback_fn("ssh/hostname verification failed"
2435                       " (checking from %s): %s" %
2436                       (verifier, result[verifier].data['nodelist'][failed]))
2437         raise errors.OpExecError("ssh/hostname verification failed.")
2438
2439     # Distribute updated /etc/hosts and known_hosts to all nodes,
2440     # including the node just added
2441     myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
2442     dist_nodes = self.cfg.GetNodeList()
2443     if not self.op.readd:
2444       dist_nodes.append(node)
2445     if myself.name in dist_nodes:
2446       dist_nodes.remove(myself.name)
2447
2448     logging.debug("Copying hosts and known_hosts to all nodes")
2449     for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE):
2450       result = self.rpc.call_upload_file(dist_nodes, fname)
2451       for to_node, to_result in result.iteritems():
2452         if to_result.failed or not to_result.data:
2453           logging.error("Copy of file %s to node %s failed", fname, to_node)
2454
2455     to_copy = []
2456     enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
2457     if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors):
2458       to_copy.append(constants.VNC_PASSWORD_FILE)
2459
2460     for fname in to_copy:
2461       result = self.rpc.call_upload_file([node], fname)
2462       if result[node].failed or not result[node]:
2463         logging.error("Could not copy file %s to node %s", fname, node)
2464
2465     if self.op.readd:
2466       self.context.ReaddNode(new_node)
2467       # make sure we redistribute the config
2468       self.cfg.Update(new_node)
2469       # and make sure the new node will not have old files around
2470       if not new_node.master_candidate:
2471         result = self.rpc.call_node_demote_from_mc(new_node.name)
2472         msg = result.RemoteFailMsg()
2473         if msg:
2474           self.LogWarning("Node failed to demote itself from master"
2475                           " candidate status: %s" % msg)
2476     else:
2477       self.context.AddNode(new_node)
2478
2479
2480 class LUSetNodeParams(LogicalUnit):
2481   """Modifies the parameters of a node.
2482
2483   """
2484   HPATH = "node-modify"
2485   HTYPE = constants.HTYPE_NODE
2486   _OP_REQP = ["node_name"]
2487   REQ_BGL = False
2488
2489   def CheckArguments(self):
2490     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2491     if node_name is None:
2492       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2493     self.op.node_name = node_name
2494     _CheckBooleanOpField(self.op, 'master_candidate')
2495     _CheckBooleanOpField(self.op, 'offline')
2496     _CheckBooleanOpField(self.op, 'drained')
2497     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2498     if all_mods.count(None) == 3:
2499       raise errors.OpPrereqError("Please pass at least one modification")
2500     if all_mods.count(True) > 1:
2501       raise errors.OpPrereqError("Can't set the node into more than one"
2502                                  " state at the same time")
2503
2504   def ExpandNames(self):
2505     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2506
2507   def BuildHooksEnv(self):
2508     """Build hooks env.
2509
2510     This runs on the master node.
2511
2512     """
2513     env = {
2514       "OP_TARGET": self.op.node_name,
2515       "MASTER_CANDIDATE": str(self.op.master_candidate),
2516       "OFFLINE": str(self.op.offline),
2517       "DRAINED": str(self.op.drained),
2518       }
2519     nl = [self.cfg.GetMasterNode(),
2520           self.op.node_name]
2521     return env, nl, nl
2522
2523   def CheckPrereq(self):
2524     """Check prerequisites.
2525
2526     This only checks the instance list against the existing names.
2527
2528     """
2529     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2530
2531     if (self.op.master_candidate is not None or
2532         self.op.drained is not None or
2533         self.op.offline is not None):
2534       # we can't change the master's node flags
2535       if self.op.node_name == self.cfg.GetMasterNode():
2536         raise errors.OpPrereqError("The master role can be changed"
2537                                    " only via masterfailover")
2538
2539     if ((self.op.master_candidate == False or self.op.offline == True or
2540          self.op.drained == True) and node.master_candidate):
2541       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2542       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2543       if num_candidates <= cp_size:
2544         msg = ("Not enough master candidates (desired"
2545                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2546         if self.op.force:
2547           self.LogWarning(msg)
2548         else:
2549           raise errors.OpPrereqError(msg)
2550
2551     if (self.op.master_candidate == True and
2552         ((node.offline and not self.op.offline == False) or
2553          (node.drained and not self.op.drained == False))):
2554       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2555                                  " to master_candidate" % node.name)
2556
2557     return
2558
2559   def Exec(self, feedback_fn):
2560     """Modifies a node.
2561
2562     """
2563     node = self.node
2564
2565     result = []
2566     changed_mc = False
2567
2568     if self.op.offline is not None:
2569       node.offline = self.op.offline
2570       result.append(("offline", str(self.op.offline)))
2571       if self.op.offline == True:
2572         if node.master_candidate:
2573           node.master_candidate = False
2574           changed_mc = True
2575           result.append(("master_candidate", "auto-demotion due to offline"))
2576         if node.drained:
2577           node.drained = False
2578           result.append(("drained", "clear drained status due to offline"))
2579
2580     if self.op.master_candidate is not None:
2581       node.master_candidate = self.op.master_candidate
2582       changed_mc = True
2583       result.append(("master_candidate", str(self.op.master_candidate)))
2584       if self.op.master_candidate == False:
2585         rrc = self.rpc.call_node_demote_from_mc(node.name)
2586         msg = rrc.RemoteFailMsg()
2587         if msg:
2588           self.LogWarning("Node failed to demote itself: %s" % msg)
2589
2590     if self.op.drained is not None:
2591       node.drained = self.op.drained
2592       result.append(("drained", str(self.op.drained)))
2593       if self.op.drained == True:
2594         if node.master_candidate:
2595           node.master_candidate = False
2596           changed_mc = True
2597           result.append(("master_candidate", "auto-demotion due to drain"))
2598           rrc = self.rpc.call_node_demote_from_mc(node.name)
2599           msg = rrc.RemoteFailMsg()
2600           if msg:
2601             self.LogWarning("Node failed to demote itself: %s" % msg)
2602         if node.offline:
2603           node.offline = False
2604           result.append(("offline", "clear offline status due to drain"))
2605
2606     # this will trigger configuration file update, if needed
2607     self.cfg.Update(node)
2608     # this will trigger job queue propagation or cleanup
2609     if changed_mc:
2610       self.context.ReaddNode(node)
2611
2612     return result
2613
2614
2615 class LUQueryClusterInfo(NoHooksLU):
2616   """Query cluster configuration.
2617
2618   """
2619   _OP_REQP = []
2620   REQ_BGL = False
2621
2622   def ExpandNames(self):
2623     self.needed_locks = {}
2624
2625   def CheckPrereq(self):
2626     """No prerequsites needed for this LU.
2627
2628     """
2629     pass
2630
2631   def Exec(self, feedback_fn):
2632     """Return cluster config.
2633
2634     """
2635     cluster = self.cfg.GetClusterInfo()
2636     result = {
2637       "software_version": constants.RELEASE_VERSION,
2638       "protocol_version": constants.PROTOCOL_VERSION,
2639       "config_version": constants.CONFIG_VERSION,
2640       "os_api_version": constants.OS_API_VERSION,
2641       "export_version": constants.EXPORT_VERSION,
2642       "architecture": (platform.architecture()[0], platform.machine()),
2643       "name": cluster.cluster_name,
2644       "master": cluster.master_node,
2645       "default_hypervisor": cluster.default_hypervisor,
2646       "enabled_hypervisors": cluster.enabled_hypervisors,
2647       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
2648                         for hypervisor_name in cluster.enabled_hypervisors]),
2649       "beparams": cluster.beparams,
2650       "candidate_pool_size": cluster.candidate_pool_size,
2651       "default_bridge": cluster.default_bridge,
2652       "master_netdev": cluster.master_netdev,
2653       "volume_group_name": cluster.volume_group_name,
2654       "file_storage_dir": cluster.file_storage_dir,
2655       "tags": list(cluster.GetTags()),
2656       }
2657
2658     return result
2659
2660
2661 class LUQueryConfigValues(NoHooksLU):
2662   """Return configuration values.
2663
2664   """
2665   _OP_REQP = []
2666   REQ_BGL = False
2667   _FIELDS_DYNAMIC = utils.FieldSet()
2668   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2669
2670   def ExpandNames(self):
2671     self.needed_locks = {}
2672
2673     _CheckOutputFields(static=self._FIELDS_STATIC,
2674                        dynamic=self._FIELDS_DYNAMIC,
2675                        selected=self.op.output_fields)
2676
2677   def CheckPrereq(self):
2678     """No prerequisites.
2679
2680     """
2681     pass
2682
2683   def Exec(self, feedback_fn):
2684     """Dump a representation of the cluster config to the standard output.
2685
2686     """
2687     values = []
2688     for field in self.op.output_fields:
2689       if field == "cluster_name":
2690         entry = self.cfg.GetClusterName()
2691       elif field == "master_node":
2692         entry = self.cfg.GetMasterNode()
2693       elif field == "drain_flag":
2694         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2695       else:
2696         raise errors.ParameterError(field)
2697       values.append(entry)
2698     return values
2699
2700
2701 class LUActivateInstanceDisks(NoHooksLU):
2702   """Bring up an instance's disks.
2703
2704   """
2705   _OP_REQP = ["instance_name"]
2706   REQ_BGL = False
2707
2708   def ExpandNames(self):
2709     self._ExpandAndLockInstance()
2710     self.needed_locks[locking.LEVEL_NODE] = []
2711     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2712
2713   def DeclareLocks(self, level):
2714     if level == locking.LEVEL_NODE:
2715       self._LockInstancesNodes()
2716
2717   def CheckPrereq(self):
2718     """Check prerequisites.
2719
2720     This checks that the instance is in the cluster.
2721
2722     """
2723     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2724     assert self.instance is not None, \
2725       "Cannot retrieve locked instance %s" % self.op.instance_name
2726     _CheckNodeOnline(self, self.instance.primary_node)
2727     if not hasattr(self.op, "ignore_size"):
2728       self.op.ignore_size = False
2729
2730   def Exec(self, feedback_fn):
2731     """Activate the disks.
2732
2733     """
2734     disks_ok, disks_info = \
2735               _AssembleInstanceDisks(self, self.instance,
2736                                      ignore_size=self.op.ignore_size)
2737     if not disks_ok:
2738       raise errors.OpExecError("Cannot activate block devices")
2739
2740     return disks_info
2741
2742
2743 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
2744                            ignore_size=False):
2745   """Prepare the block devices for an instance.
2746
2747   This sets up the block devices on all nodes.
2748
2749   @type lu: L{LogicalUnit}
2750   @param lu: the logical unit on whose behalf we execute
2751   @type instance: L{objects.Instance}
2752   @param instance: the instance for whose disks we assemble
2753   @type ignore_secondaries: boolean
2754   @param ignore_secondaries: if true, errors on secondary nodes
2755       won't result in an error return from the function
2756   @type ignore_size: boolean
2757   @param ignore_size: if true, the current known size of the disk
2758       will not be used during the disk activation, useful for cases
2759       when the size is wrong
2760   @return: False if the operation failed, otherwise a list of
2761       (host, instance_visible_name, node_visible_name)
2762       with the mapping from node devices to instance devices
2763
2764   """
2765   device_info = []
2766   disks_ok = True
2767   iname = instance.name
2768   # With the two passes mechanism we try to reduce the window of
2769   # opportunity for the race condition of switching DRBD to primary
2770   # before handshaking occured, but we do not eliminate it
2771
2772   # The proper fix would be to wait (with some limits) until the
2773   # connection has been made and drbd transitions from WFConnection
2774   # into any other network-connected state (Connected, SyncTarget,
2775   # SyncSource, etc.)
2776
2777   # 1st pass, assemble on all nodes in secondary mode
2778   for inst_disk in instance.disks:
2779     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2780       if ignore_size:
2781         node_disk = node_disk.Copy()
2782         node_disk.UnsetSize()
2783       lu.cfg.SetDiskID(node_disk, node)
2784       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2785       msg = result.RemoteFailMsg()
2786       if msg:
2787         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2788                            " (is_primary=False, pass=1): %s",
2789                            inst_disk.iv_name, node, msg)
2790         if not ignore_secondaries:
2791           disks_ok = False
2792
2793   # FIXME: race condition on drbd migration to primary
2794
2795   # 2nd pass, do only the primary node
2796   for inst_disk in instance.disks:
2797     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2798       if node != instance.primary_node:
2799         continue
2800       if ignore_size:
2801         node_disk = node_disk.Copy()
2802         node_disk.UnsetSize()
2803       lu.cfg.SetDiskID(node_disk, node)
2804       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2805       msg = result.RemoteFailMsg()
2806       if msg:
2807         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2808                            " (is_primary=True, pass=2): %s",
2809                            inst_disk.iv_name, node, msg)
2810         disks_ok = False
2811     device_info.append((instance.primary_node, inst_disk.iv_name,
2812                         result.payload))
2813
2814   # leave the disks configured for the primary node
2815   # this is a workaround that would be fixed better by
2816   # improving the logical/physical id handling
2817   for disk in instance.disks:
2818     lu.cfg.SetDiskID(disk, instance.primary_node)
2819
2820   return disks_ok, device_info
2821
2822
2823 def _StartInstanceDisks(lu, instance, force):
2824   """Start the disks of an instance.
2825
2826   """
2827   disks_ok, _ = _AssembleInstanceDisks(lu, instance,
2828                                            ignore_secondaries=force)
2829   if not disks_ok:
2830     _ShutdownInstanceDisks(lu, instance)
2831     if force is not None and not force:
2832       lu.proc.LogWarning("", hint="If the message above refers to a"
2833                          " secondary node,"
2834                          " you can retry the operation using '--force'.")
2835     raise errors.OpExecError("Disk consistency error")
2836
2837
2838 class LUDeactivateInstanceDisks(NoHooksLU):
2839   """Shutdown an instance's disks.
2840
2841   """
2842   _OP_REQP = ["instance_name"]
2843   REQ_BGL = False
2844
2845   def ExpandNames(self):
2846     self._ExpandAndLockInstance()
2847     self.needed_locks[locking.LEVEL_NODE] = []
2848     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2849
2850   def DeclareLocks(self, level):
2851     if level == locking.LEVEL_NODE:
2852       self._LockInstancesNodes()
2853
2854   def CheckPrereq(self):
2855     """Check prerequisites.
2856
2857     This checks that the instance is in the cluster.
2858
2859     """
2860     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2861     assert self.instance is not None, \
2862       "Cannot retrieve locked instance %s" % self.op.instance_name
2863
2864   def Exec(self, feedback_fn):
2865     """Deactivate the disks
2866
2867     """
2868     instance = self.instance
2869     _SafeShutdownInstanceDisks(self, instance)
2870
2871
2872 def _SafeShutdownInstanceDisks(lu, instance):
2873   """Shutdown block devices of an instance.
2874
2875   This function checks if an instance is running, before calling
2876   _ShutdownInstanceDisks.
2877
2878   """
2879   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2880                                       [instance.hypervisor])
2881   ins_l = ins_l[instance.primary_node]
2882   if ins_l.failed or not isinstance(ins_l.data, list):
2883     raise errors.OpExecError("Can't contact node '%s'" %
2884                              instance.primary_node)
2885
2886   if instance.name in ins_l.data:
2887     raise errors.OpExecError("Instance is running, can't shutdown"
2888                              " block devices.")
2889
2890   _ShutdownInstanceDisks(lu, instance)
2891
2892
2893 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2894   """Shutdown block devices of an instance.
2895
2896   This does the shutdown on all nodes of the instance.
2897
2898   If the ignore_primary is false, errors on the primary node are
2899   ignored.
2900
2901   """
2902   all_result = True
2903   for disk in instance.disks:
2904     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2905       lu.cfg.SetDiskID(top_disk, node)
2906       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2907       msg = result.RemoteFailMsg()
2908       if msg:
2909         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2910                       disk.iv_name, node, msg)
2911         if not ignore_primary or node != instance.primary_node:
2912           all_result = False
2913   return all_result
2914
2915
2916 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2917   """Checks if a node has enough free memory.
2918
2919   This function check if a given node has the needed amount of free
2920   memory. In case the node has less memory or we cannot get the
2921   information from the node, this function raise an OpPrereqError
2922   exception.
2923
2924   @type lu: C{LogicalUnit}
2925   @param lu: a logical unit from which we get configuration data
2926   @type node: C{str}
2927   @param node: the node to check
2928   @type reason: C{str}
2929   @param reason: string to use in the error message
2930   @type requested: C{int}
2931   @param requested: the amount of memory in MiB to check for
2932   @type hypervisor_name: C{str}
2933   @param hypervisor_name: the hypervisor to ask for memory stats
2934   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2935       we cannot check the node
2936
2937   """
2938   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2939   nodeinfo[node].Raise()
2940   free_mem = nodeinfo[node].data.get('memory_free')
2941   if not isinstance(free_mem, int):
2942     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2943                              " was '%s'" % (node, free_mem))
2944   if requested > free_mem:
2945     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2946                              " needed %s MiB, available %s MiB" %
2947                              (node, reason, requested, free_mem))
2948
2949
2950 class LUStartupInstance(LogicalUnit):
2951   """Starts an instance.
2952
2953   """
2954   HPATH = "instance-start"
2955   HTYPE = constants.HTYPE_INSTANCE
2956   _OP_REQP = ["instance_name", "force"]
2957   REQ_BGL = False
2958
2959   def ExpandNames(self):
2960     self._ExpandAndLockInstance()
2961
2962   def BuildHooksEnv(self):
2963     """Build hooks env.
2964
2965     This runs on master, primary and secondary nodes of the instance.
2966
2967     """
2968     env = {
2969       "FORCE": self.op.force,
2970       }
2971     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2972     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2973     return env, nl, nl
2974
2975   def CheckPrereq(self):
2976     """Check prerequisites.
2977
2978     This checks that the instance is in the cluster.
2979
2980     """
2981     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2982     assert self.instance is not None, \
2983       "Cannot retrieve locked instance %s" % self.op.instance_name
2984
2985     # extra beparams
2986     self.beparams = getattr(self.op, "beparams", {})
2987     if self.beparams:
2988       if not isinstance(self.beparams, dict):
2989         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2990                                    " dict" % (type(self.beparams), ))
2991       # fill the beparams dict
2992       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2993       self.op.beparams = self.beparams
2994
2995     # extra hvparams
2996     self.hvparams = getattr(self.op, "hvparams", {})
2997     if self.hvparams:
2998       if not isinstance(self.hvparams, dict):
2999         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
3000                                    " dict" % (type(self.hvparams), ))
3001
3002       # check hypervisor parameter syntax (locally)
3003       cluster = self.cfg.GetClusterInfo()
3004       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
3005       filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
3006                                     instance.hvparams)
3007       filled_hvp.update(self.hvparams)
3008       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
3009       hv_type.CheckParameterSyntax(filled_hvp)
3010       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
3011       self.op.hvparams = self.hvparams
3012
3013     _CheckNodeOnline(self, instance.primary_node)
3014
3015     bep = self.cfg.GetClusterInfo().FillBE(instance)
3016     # check bridges existence
3017     _CheckInstanceBridgesExist(self, instance)
3018
3019     remote_info = self.rpc.call_instance_info(instance.primary_node,
3020                                               instance.name,
3021                                               instance.hypervisor)
3022     remote_info.Raise()
3023     if not remote_info.data:
3024       _CheckNodeFreeMemory(self, instance.primary_node,
3025                            "starting instance %s" % instance.name,
3026                            bep[constants.BE_MEMORY], instance.hypervisor)
3027
3028   def Exec(self, feedback_fn):
3029     """Start the instance.
3030
3031     """
3032     instance = self.instance
3033     force = self.op.force
3034
3035     self.cfg.MarkInstanceUp(instance.name)
3036
3037     node_current = instance.primary_node
3038
3039     _StartInstanceDisks(self, instance, force)
3040
3041     result = self.rpc.call_instance_start(node_current, instance,
3042                                           self.hvparams, self.beparams)
3043     msg = result.RemoteFailMsg()
3044     if msg:
3045       _ShutdownInstanceDisks(self, instance)
3046       raise errors.OpExecError("Could not start instance: %s" % msg)
3047
3048
3049 class LURebootInstance(LogicalUnit):
3050   """Reboot an instance.
3051
3052   """
3053   HPATH = "instance-reboot"
3054   HTYPE = constants.HTYPE_INSTANCE
3055   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
3056   REQ_BGL = False
3057
3058   def ExpandNames(self):
3059     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
3060                                    constants.INSTANCE_REBOOT_HARD,
3061                                    constants.INSTANCE_REBOOT_FULL]:
3062       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
3063                                   (constants.INSTANCE_REBOOT_SOFT,
3064                                    constants.INSTANCE_REBOOT_HARD,
3065                                    constants.INSTANCE_REBOOT_FULL))
3066     self._ExpandAndLockInstance()
3067
3068   def BuildHooksEnv(self):
3069     """Build hooks env.
3070
3071     This runs on master, primary and secondary nodes of the instance.
3072
3073     """
3074     env = {
3075       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
3076       "REBOOT_TYPE": self.op.reboot_type,
3077       }
3078     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3079     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3080     return env, nl, nl
3081
3082   def CheckPrereq(self):
3083     """Check prerequisites.
3084
3085     This checks that the instance is in the cluster.
3086
3087     """
3088     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3089     assert self.instance is not None, \
3090       "Cannot retrieve locked instance %s" % self.op.instance_name
3091
3092     _CheckNodeOnline(self, instance.primary_node)
3093
3094     # check bridges existence
3095     _CheckInstanceBridgesExist(self, instance)
3096
3097   def Exec(self, feedback_fn):
3098     """Reboot the instance.
3099
3100     """
3101     instance = self.instance
3102     ignore_secondaries = self.op.ignore_secondaries
3103     reboot_type = self.op.reboot_type
3104
3105     node_current = instance.primary_node
3106
3107     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3108                        constants.INSTANCE_REBOOT_HARD]:
3109       for disk in instance.disks:
3110         self.cfg.SetDiskID(disk, node_current)
3111       result = self.rpc.call_instance_reboot(node_current, instance,
3112                                              reboot_type)
3113       msg = result.RemoteFailMsg()
3114       if msg:
3115         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3116     else:
3117       result = self.rpc.call_instance_shutdown(node_current, instance)
3118       msg = result.RemoteFailMsg()
3119       if msg:
3120         raise errors.OpExecError("Could not shutdown instance for"
3121                                  " full reboot: %s" % msg)
3122       _ShutdownInstanceDisks(self, instance)
3123       _StartInstanceDisks(self, instance, ignore_secondaries)
3124       result = self.rpc.call_instance_start(node_current, instance, None, None)
3125       msg = result.RemoteFailMsg()
3126       if msg:
3127         _ShutdownInstanceDisks(self, instance)
3128         raise errors.OpExecError("Could not start instance for"
3129                                  " full reboot: %s" % msg)
3130
3131     self.cfg.MarkInstanceUp(instance.name)
3132
3133
3134 class LUShutdownInstance(LogicalUnit):
3135   """Shutdown an instance.
3136
3137   """
3138   HPATH = "instance-stop"
3139   HTYPE = constants.HTYPE_INSTANCE
3140   _OP_REQP = ["instance_name"]
3141   REQ_BGL = False
3142
3143   def ExpandNames(self):
3144     self._ExpandAndLockInstance()
3145
3146   def BuildHooksEnv(self):
3147     """Build hooks env.
3148
3149     This runs on master, primary and secondary nodes of the instance.
3150
3151     """
3152     env = _BuildInstanceHookEnvByObject(self, self.instance)
3153     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3154     return env, nl, nl
3155
3156   def CheckPrereq(self):
3157     """Check prerequisites.
3158
3159     This checks that the instance is in the cluster.
3160
3161     """
3162     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3163     assert self.instance is not None, \
3164       "Cannot retrieve locked instance %s" % self.op.instance_name
3165     _CheckNodeOnline(self, self.instance.primary_node)
3166
3167   def Exec(self, feedback_fn):
3168     """Shutdown the instance.
3169
3170     """
3171     instance = self.instance
3172     node_current = instance.primary_node
3173     self.cfg.MarkInstanceDown(instance.name)
3174     result = self.rpc.call_instance_shutdown(node_current, instance)
3175     msg = result.RemoteFailMsg()
3176     if msg:
3177       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3178
3179     _ShutdownInstanceDisks(self, instance)
3180
3181
3182 class LUReinstallInstance(LogicalUnit):
3183   """Reinstall an instance.
3184
3185   """
3186   HPATH = "instance-reinstall"
3187   HTYPE = constants.HTYPE_INSTANCE
3188   _OP_REQP = ["instance_name"]
3189   REQ_BGL = False
3190
3191   def ExpandNames(self):
3192     self._ExpandAndLockInstance()
3193
3194   def BuildHooksEnv(self):
3195     """Build hooks env.
3196
3197     This runs on master, primary and secondary nodes of the instance.
3198
3199     """
3200     env = _BuildInstanceHookEnvByObject(self, self.instance)
3201     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3202     return env, nl, nl
3203
3204   def CheckPrereq(self):
3205     """Check prerequisites.
3206
3207     This checks that the instance is in the cluster and is not running.
3208
3209     """
3210     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3211     assert instance is not None, \
3212       "Cannot retrieve locked instance %s" % self.op.instance_name
3213     _CheckNodeOnline(self, instance.primary_node)
3214
3215     if instance.disk_template == constants.DT_DISKLESS:
3216       raise errors.OpPrereqError("Instance '%s' has no disks" %
3217                                  self.op.instance_name)
3218     if instance.admin_up:
3219       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3220                                  self.op.instance_name)
3221     remote_info = self.rpc.call_instance_info(instance.primary_node,
3222                                               instance.name,
3223                                               instance.hypervisor)
3224     remote_info.Raise()
3225     if remote_info.data:
3226       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3227                                  (self.op.instance_name,
3228                                   instance.primary_node))
3229
3230     self.op.os_type = getattr(self.op, "os_type", None)
3231     if self.op.os_type is not None:
3232       # OS verification
3233       pnode = self.cfg.GetNodeInfo(
3234         self.cfg.ExpandNodeName(instance.primary_node))
3235       if pnode is None:
3236         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3237                                    self.op.pnode)
3238       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3239       result.Raise()
3240       if not isinstance(result.data, objects.OS):
3241         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3242                                    " primary node"  % self.op.os_type)
3243
3244     self.instance = instance
3245
3246   def Exec(self, feedback_fn):
3247     """Reinstall the instance.
3248
3249     """
3250     inst = self.instance
3251
3252     if self.op.os_type is not None:
3253       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3254       inst.os = self.op.os_type
3255       self.cfg.Update(inst)
3256
3257     _StartInstanceDisks(self, inst, None)
3258     try:
3259       feedback_fn("Running the instance OS create scripts...")
3260       result = self.rpc.call_instance_os_add(inst.primary_node, inst)
3261       msg = result.RemoteFailMsg()
3262       if msg:
3263         raise errors.OpExecError("Could not install OS for instance %s"
3264                                  " on node %s: %s" %
3265                                  (inst.name, inst.primary_node, msg))
3266     finally:
3267       _ShutdownInstanceDisks(self, inst)
3268
3269
3270 class LURenameInstance(LogicalUnit):
3271   """Rename an instance.
3272
3273   """
3274   HPATH = "instance-rename"
3275   HTYPE = constants.HTYPE_INSTANCE
3276   _OP_REQP = ["instance_name", "new_name"]
3277
3278   def BuildHooksEnv(self):
3279     """Build hooks env.
3280
3281     This runs on master, primary and secondary nodes of the instance.
3282
3283     """
3284     env = _BuildInstanceHookEnvByObject(self, self.instance)
3285     env["INSTANCE_NEW_NAME"] = self.op.new_name
3286     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3287     return env, nl, nl
3288
3289   def CheckPrereq(self):
3290     """Check prerequisites.
3291
3292     This checks that the instance is in the cluster and is not running.
3293
3294     """
3295     instance = self.cfg.GetInstanceInfo(
3296       self.cfg.ExpandInstanceName(self.op.instance_name))
3297     if instance is None:
3298       raise errors.OpPrereqError("Instance '%s' not known" %
3299                                  self.op.instance_name)
3300     _CheckNodeOnline(self, instance.primary_node)
3301
3302     if instance.admin_up:
3303       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3304                                  self.op.instance_name)
3305     remote_info = self.rpc.call_instance_info(instance.primary_node,
3306                                               instance.name,
3307                                               instance.hypervisor)
3308     remote_info.Raise()
3309     if remote_info.data:
3310       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3311                                  (self.op.instance_name,
3312                                   instance.primary_node))
3313     self.instance = instance
3314
3315     # new name verification
3316     name_info = utils.HostInfo(self.op.new_name)
3317
3318     self.op.new_name = new_name = name_info.name
3319     instance_list = self.cfg.GetInstanceList()
3320     if new_name in instance_list:
3321       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3322                                  new_name)
3323
3324     if not getattr(self.op, "ignore_ip", False):
3325       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3326         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3327                                    (name_info.ip, new_name))
3328
3329
3330   def Exec(self, feedback_fn):
3331     """Reinstall the instance.
3332
3333     """
3334     inst = self.instance
3335     old_name = inst.name
3336
3337     if inst.disk_template == constants.DT_FILE:
3338       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3339
3340     self.cfg.RenameInstance(inst.name, self.op.new_name)
3341     # Change the instance lock. This is definitely safe while we hold the BGL
3342     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3343     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3344
3345     # re-read the instance from the configuration after rename
3346     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3347
3348     if inst.disk_template == constants.DT_FILE:
3349       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3350       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3351                                                      old_file_storage_dir,
3352                                                      new_file_storage_dir)
3353       result.Raise()
3354       if not result.data:
3355         raise errors.OpExecError("Could not connect to node '%s' to rename"
3356                                  " directory '%s' to '%s' (but the instance"
3357                                  " has been renamed in Ganeti)" % (
3358                                  inst.primary_node, old_file_storage_dir,
3359                                  new_file_storage_dir))
3360
3361       if not result.data[0]:
3362         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3363                                  " (but the instance has been renamed in"
3364                                  " Ganeti)" % (old_file_storage_dir,
3365                                                new_file_storage_dir))
3366
3367     _StartInstanceDisks(self, inst, None)
3368     try:
3369       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3370                                                  old_name)
3371       msg = result.RemoteFailMsg()
3372       if msg:
3373         msg = ("Could not run OS rename script for instance %s on node %s"
3374                " (but the instance has been renamed in Ganeti): %s" %
3375                (inst.name, inst.primary_node, msg))
3376         self.proc.LogWarning(msg)
3377     finally:
3378       _ShutdownInstanceDisks(self, inst)
3379
3380
3381 class LURemoveInstance(LogicalUnit):
3382   """Remove an instance.
3383
3384   """
3385   HPATH = "instance-remove"
3386   HTYPE = constants.HTYPE_INSTANCE
3387   _OP_REQP = ["instance_name", "ignore_failures"]
3388   REQ_BGL = False
3389
3390   def ExpandNames(self):
3391     self._ExpandAndLockInstance()
3392     self.needed_locks[locking.LEVEL_NODE] = []
3393     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3394
3395   def DeclareLocks(self, level):
3396     if level == locking.LEVEL_NODE:
3397       self._LockInstancesNodes()
3398
3399   def BuildHooksEnv(self):
3400     """Build hooks env.
3401
3402     This runs on master, primary and secondary nodes of the instance.
3403
3404     """
3405     env = _BuildInstanceHookEnvByObject(self, self.instance)
3406     nl = [self.cfg.GetMasterNode()]
3407     return env, nl, nl
3408
3409   def CheckPrereq(self):
3410     """Check prerequisites.
3411
3412     This checks that the instance is in the cluster.
3413
3414     """
3415     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3416     assert self.instance is not None, \
3417       "Cannot retrieve locked instance %s" % self.op.instance_name
3418
3419   def Exec(self, feedback_fn):
3420     """Remove the instance.
3421
3422     """
3423     instance = self.instance
3424     logging.info("Shutting down instance %s on node %s",
3425                  instance.name, instance.primary_node)
3426
3427     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3428     msg = result.RemoteFailMsg()
3429     if msg:
3430       if self.op.ignore_failures:
3431         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3432       else:
3433         raise errors.OpExecError("Could not shutdown instance %s on"
3434                                  " node %s: %s" %
3435                                  (instance.name, instance.primary_node, msg))
3436
3437     logging.info("Removing block devices for instance %s", instance.name)
3438
3439     if not _RemoveDisks(self, instance):
3440       if self.op.ignore_failures:
3441         feedback_fn("Warning: can't remove instance's disks")
3442       else:
3443         raise errors.OpExecError("Can't remove instance's disks")
3444
3445     logging.info("Removing instance %s out of cluster config", instance.name)
3446
3447     self.cfg.RemoveInstance(instance.name)
3448     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3449
3450
3451 class LUQueryInstances(NoHooksLU):
3452   """Logical unit for querying instances.
3453
3454   """
3455   _OP_REQP = ["output_fields", "names", "use_locking"]
3456   REQ_BGL = False
3457   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3458                                     "admin_state",
3459                                     "disk_template", "ip", "mac", "bridge",
3460                                     "sda_size", "sdb_size", "vcpus", "tags",
3461                                     "network_port", "beparams",
3462                                     r"(disk)\.(size)/([0-9]+)",
3463                                     r"(disk)\.(sizes)", "disk_usage",
3464                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3465                                     r"(nic)\.(macs|ips|bridges)",
3466                                     r"(disk|nic)\.(count)",
3467                                     "serial_no", "hypervisor", "hvparams",] +
3468                                   ["hv/%s" % name
3469                                    for name in constants.HVS_PARAMETERS] +
3470                                   ["be/%s" % name
3471                                    for name in constants.BES_PARAMETERS])
3472   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3473
3474
3475   def ExpandNames(self):
3476     _CheckOutputFields(static=self._FIELDS_STATIC,
3477                        dynamic=self._FIELDS_DYNAMIC,
3478                        selected=self.op.output_fields)
3479
3480     self.needed_locks = {}
3481     self.share_locks[locking.LEVEL_INSTANCE] = 1
3482     self.share_locks[locking.LEVEL_NODE] = 1
3483
3484     if self.op.names:
3485       self.wanted = _GetWantedInstances(self, self.op.names)
3486     else:
3487       self.wanted = locking.ALL_SET
3488
3489     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3490     self.do_locking = self.do_node_query and self.op.use_locking
3491     if self.do_locking:
3492       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3493       self.needed_locks[locking.LEVEL_NODE] = []
3494       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3495
3496   def DeclareLocks(self, level):
3497     if level == locking.LEVEL_NODE and self.do_locking:
3498       self._LockInstancesNodes()
3499
3500   def CheckPrereq(self):
3501     """Check prerequisites.
3502
3503     """
3504     pass
3505
3506   def Exec(self, feedback_fn):
3507     """Computes the list of nodes and their attributes.
3508
3509     """
3510     all_info = self.cfg.GetAllInstancesInfo()
3511     if self.wanted == locking.ALL_SET:
3512       # caller didn't specify instance names, so ordering is not important
3513       if self.do_locking:
3514         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3515       else:
3516         instance_names = all_info.keys()
3517       instance_names = utils.NiceSort(instance_names)
3518     else:
3519       # caller did specify names, so we must keep the ordering
3520       if self.do_locking:
3521         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3522       else:
3523         tgt_set = all_info.keys()
3524       missing = set(self.wanted).difference(tgt_set)
3525       if missing:
3526         raise errors.OpExecError("Some instances were removed before"
3527                                  " retrieving their data: %s" % missing)
3528       instance_names = self.wanted
3529
3530     instance_list = [all_info[iname] for iname in instance_names]
3531
3532     # begin data gathering
3533
3534     nodes = frozenset([inst.primary_node for inst in instance_list])
3535     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3536
3537     bad_nodes = []
3538     off_nodes = []
3539     if self.do_node_query:
3540       live_data = {}
3541       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3542       for name in nodes:
3543         result = node_data[name]
3544         if result.offline:
3545           # offline nodes will be in both lists
3546           off_nodes.append(name)
3547         if result.failed:
3548           bad_nodes.append(name)
3549         else:
3550           if result.data:
3551             live_data.update(result.data)
3552             # else no instance is alive
3553     else:
3554       live_data = dict([(name, {}) for name in instance_names])
3555
3556     # end data gathering
3557
3558     HVPREFIX = "hv/"
3559     BEPREFIX = "be/"
3560     output = []
3561     for instance in instance_list:
3562       iout = []
3563       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3564       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3565       for field in self.op.output_fields:
3566         st_match = self._FIELDS_STATIC.Matches(field)
3567         if field == "name":
3568           val = instance.name
3569         elif field == "os":
3570           val = instance.os
3571         elif field == "pnode":
3572           val = instance.primary_node
3573         elif field == "snodes":
3574           val = list(instance.secondary_nodes)
3575         elif field == "admin_state":
3576           val = instance.admin_up
3577         elif field == "oper_state":
3578           if instance.primary_node in bad_nodes:
3579             val = None
3580           else:
3581             val = bool(live_data.get(instance.name))
3582         elif field == "status":
3583           if instance.primary_node in off_nodes:
3584             val = "ERROR_nodeoffline"
3585           elif instance.primary_node in bad_nodes:
3586             val = "ERROR_nodedown"
3587           else:
3588             running = bool(live_data.get(instance.name))
3589             if running:
3590               if instance.admin_up:
3591                 val = "running"
3592               else:
3593                 val = "ERROR_up"
3594             else:
3595               if instance.admin_up:
3596                 val = "ERROR_down"
3597               else:
3598                 val = "ADMIN_down"
3599         elif field == "oper_ram":
3600           if instance.primary_node in bad_nodes:
3601             val = None
3602           elif instance.name in live_data:
3603             val = live_data[instance.name].get("memory", "?")
3604           else:
3605             val = "-"
3606         elif field == "vcpus":
3607           val = i_be[constants.BE_VCPUS]
3608         elif field == "disk_template":
3609           val = instance.disk_template
3610         elif field == "ip":
3611           if instance.nics:
3612             val = instance.nics[0].ip
3613           else:
3614             val = None
3615         elif field == "bridge":
3616           if instance.nics:
3617             val = instance.nics[0].bridge
3618           else:
3619             val = None
3620         elif field == "mac":
3621           if instance.nics:
3622             val = instance.nics[0].mac
3623           else:
3624             val = None
3625         elif field == "sda_size" or field == "sdb_size":
3626           idx = ord(field[2]) - ord('a')
3627           try:
3628             val = instance.FindDisk(idx).size
3629           except errors.OpPrereqError:
3630             val = None
3631         elif field == "disk_usage": # total disk usage per node
3632           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3633           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3634         elif field == "tags":
3635           val = list(instance.GetTags())
3636         elif field == "serial_no":
3637           val = instance.serial_no
3638         elif field == "network_port":
3639           val = instance.network_port
3640         elif field == "hypervisor":
3641           val = instance.hypervisor
3642         elif field == "hvparams":
3643           val = i_hv
3644         elif (field.startswith(HVPREFIX) and
3645               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3646           val = i_hv.get(field[len(HVPREFIX):], None)
3647         elif field == "beparams":
3648           val = i_be
3649         elif (field.startswith(BEPREFIX) and
3650               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3651           val = i_be.get(field[len(BEPREFIX):], None)
3652         elif st_match and st_match.groups():
3653           # matches a variable list
3654           st_groups = st_match.groups()
3655           if st_groups and st_groups[0] == "disk":
3656             if st_groups[1] == "count":
3657               val = len(instance.disks)
3658             elif st_groups[1] == "sizes":
3659               val = [disk.size for disk in instance.disks]
3660             elif st_groups[1] == "size":
3661               try:
3662                 val = instance.FindDisk(st_groups[2]).size
3663               except errors.OpPrereqError:
3664                 val = None
3665             else:
3666               assert False, "Unhandled disk parameter"
3667           elif st_groups[0] == "nic":
3668             if st_groups[1] == "count":
3669               val = len(instance.nics)
3670             elif st_groups[1] == "macs":
3671               val = [nic.mac for nic in instance.nics]
3672             elif st_groups[1] == "ips":
3673               val = [nic.ip for nic in instance.nics]
3674             elif st_groups[1] == "bridges":
3675               val = [nic.bridge for nic in instance.nics]
3676             else:
3677               # index-based item
3678               nic_idx = int(st_groups[2])
3679               if nic_idx >= len(instance.nics):
3680                 val = None
3681               else:
3682                 if st_groups[1] == "mac":
3683                   val = instance.nics[nic_idx].mac
3684                 elif st_groups[1] == "ip":
3685                   val = instance.nics[nic_idx].ip
3686                 elif st_groups[1] == "bridge":
3687                   val = instance.nics[nic_idx].bridge
3688                 else:
3689                   assert False, "Unhandled NIC parameter"
3690           else:
3691             assert False, ("Declared but unhandled variable parameter '%s'" %
3692                            field)
3693         else:
3694           assert False, "Declared but unhandled parameter '%s'" % field
3695         iout.append(val)
3696       output.append(iout)
3697
3698     return output
3699
3700
3701 class LUFailoverInstance(LogicalUnit):
3702   """Failover an instance.
3703
3704   """
3705   HPATH = "instance-failover"
3706   HTYPE = constants.HTYPE_INSTANCE
3707   _OP_REQP = ["instance_name", "ignore_consistency"]
3708   REQ_BGL = False
3709
3710   def ExpandNames(self):
3711     self._ExpandAndLockInstance()
3712     self.needed_locks[locking.LEVEL_NODE] = []
3713     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3714
3715   def DeclareLocks(self, level):
3716     if level == locking.LEVEL_NODE:
3717       self._LockInstancesNodes()
3718
3719   def BuildHooksEnv(self):
3720     """Build hooks env.
3721
3722     This runs on master, primary and secondary nodes of the instance.
3723
3724     """
3725     env = {
3726       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3727       }
3728     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3729     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3730     return env, nl, nl
3731
3732   def CheckPrereq(self):
3733     """Check prerequisites.
3734
3735     This checks that the instance is in the cluster.
3736
3737     """
3738     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3739     assert self.instance is not None, \
3740       "Cannot retrieve locked instance %s" % self.op.instance_name
3741
3742     bep = self.cfg.GetClusterInfo().FillBE(instance)
3743     if instance.disk_template not in constants.DTS_NET_MIRROR:
3744       raise errors.OpPrereqError("Instance's disk layout is not"
3745                                  " network mirrored, cannot failover.")
3746
3747     secondary_nodes = instance.secondary_nodes
3748     if not secondary_nodes:
3749       raise errors.ProgrammerError("no secondary node but using "
3750                                    "a mirrored disk template")
3751
3752     target_node = secondary_nodes[0]
3753     _CheckNodeOnline(self, target_node)
3754     _CheckNodeNotDrained(self, target_node)
3755
3756     if instance.admin_up:
3757       # check memory requirements on the secondary node
3758       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3759                            instance.name, bep[constants.BE_MEMORY],
3760                            instance.hypervisor)
3761     else:
3762       self.LogInfo("Not checking memory on the secondary node as"
3763                    " instance will not be started")
3764
3765     # check bridge existence
3766     brlist = [nic.bridge for nic in instance.nics]
3767     result = self.rpc.call_bridges_exist(target_node, brlist)
3768     result.Raise()
3769     if not result.data:
3770       raise errors.OpPrereqError("One or more target bridges %s does not"
3771                                  " exist on destination node '%s'" %
3772                                  (brlist, target_node))
3773
3774   def Exec(self, feedback_fn):
3775     """Failover an instance.
3776
3777     The failover is done by shutting it down on its present node and
3778     starting it on the secondary.
3779
3780     """
3781     instance = self.instance
3782
3783     source_node = instance.primary_node
3784     target_node = instance.secondary_nodes[0]
3785
3786     feedback_fn("* checking disk consistency between source and target")
3787     for dev in instance.disks:
3788       # for drbd, these are drbd over lvm
3789       if not _CheckDiskConsistency(self, dev, target_node, False):
3790         if instance.admin_up and not self.op.ignore_consistency:
3791           raise errors.OpExecError("Disk %s is degraded on target node,"
3792                                    " aborting failover." % dev.iv_name)
3793
3794     feedback_fn("* shutting down instance on source node")
3795     logging.info("Shutting down instance %s on node %s",
3796                  instance.name, source_node)
3797
3798     result = self.rpc.call_instance_shutdown(source_node, instance)
3799     msg = result.RemoteFailMsg()
3800     if msg:
3801       if self.op.ignore_consistency:
3802         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3803                              " Proceeding anyway. Please make sure node"
3804                              " %s is down. Error details: %s",
3805                              instance.name, source_node, source_node, msg)
3806       else:
3807         raise errors.OpExecError("Could not shutdown instance %s on"
3808                                  " node %s: %s" %
3809                                  (instance.name, source_node, msg))
3810
3811     feedback_fn("* deactivating the instance's disks on source node")
3812     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3813       raise errors.OpExecError("Can't shut down the instance's disks.")
3814
3815     instance.primary_node = target_node
3816     # distribute new instance config to the other nodes
3817     self.cfg.Update(instance)
3818
3819     # Only start the instance if it's marked as up
3820     if instance.admin_up:
3821       feedback_fn("* activating the instance's disks on target node")
3822       logging.info("Starting instance %s on node %s",
3823                    instance.name, target_node)
3824
3825       disks_ok, _ = _AssembleInstanceDisks(self, instance,
3826                                                ignore_secondaries=True)
3827       if not disks_ok:
3828         _ShutdownInstanceDisks(self, instance)
3829         raise errors.OpExecError("Can't activate the instance's disks")
3830
3831       feedback_fn("* starting the instance on the target node")
3832       result = self.rpc.call_instance_start(target_node, instance, None, None)
3833       msg = result.RemoteFailMsg()
3834       if msg:
3835         _ShutdownInstanceDisks(self, instance)
3836         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3837                                  (instance.name, target_node, msg))
3838
3839
3840 class LUMigrateInstance(LogicalUnit):
3841   """Migrate an instance.
3842
3843   This is migration without shutting down, compared to the failover,
3844   which is done with shutdown.
3845
3846   """
3847   HPATH = "instance-migrate"
3848   HTYPE = constants.HTYPE_INSTANCE
3849   _OP_REQP = ["instance_name", "live", "cleanup"]
3850
3851   REQ_BGL = False
3852
3853   def ExpandNames(self):
3854     self._ExpandAndLockInstance()
3855     self.needed_locks[locking.LEVEL_NODE] = []
3856     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3857
3858   def DeclareLocks(self, level):
3859     if level == locking.LEVEL_NODE:
3860       self._LockInstancesNodes()
3861
3862   def BuildHooksEnv(self):
3863     """Build hooks env.
3864
3865     This runs on master, primary and secondary nodes of the instance.
3866
3867     """
3868     env = _BuildInstanceHookEnvByObject(self, self.instance)
3869     env["MIGRATE_LIVE"] = self.op.live
3870     env["MIGRATE_CLEANUP"] = self.op.cleanup
3871     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3872     return env, nl, nl
3873
3874   def CheckPrereq(self):
3875     """Check prerequisites.
3876
3877     This checks that the instance is in the cluster.
3878
3879     """
3880     instance = self.cfg.GetInstanceInfo(
3881       self.cfg.ExpandInstanceName(self.op.instance_name))
3882     if instance is None:
3883       raise errors.OpPrereqError("Instance '%s' not known" %
3884                                  self.op.instance_name)
3885
3886     if instance.disk_template != constants.DT_DRBD8:
3887       raise errors.OpPrereqError("Instance's disk layout is not"
3888                                  " drbd8, cannot migrate.")
3889
3890     secondary_nodes = instance.secondary_nodes
3891     if not secondary_nodes:
3892       raise errors.ConfigurationError("No secondary node but using"
3893                                       " drbd8 disk template")
3894
3895     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3896
3897     target_node = secondary_nodes[0]
3898     # check memory requirements on the secondary node
3899     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3900                          instance.name, i_be[constants.BE_MEMORY],
3901                          instance.hypervisor)
3902
3903     # check bridge existence
3904     brlist = [nic.bridge for nic in instance.nics]
3905     result = self.rpc.call_bridges_exist(target_node, brlist)
3906     if result.failed or not result.data:
3907       raise errors.OpPrereqError("One or more target bridges %s does not"
3908                                  " exist on destination node '%s'" %
3909                                  (brlist, target_node))
3910
3911     if not self.op.cleanup:
3912       _CheckNodeNotDrained(self, target_node)
3913       result = self.rpc.call_instance_migratable(instance.primary_node,
3914                                                  instance)
3915       msg = result.RemoteFailMsg()
3916       if msg:
3917         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3918                                    msg)
3919
3920     self.instance = instance
3921
3922   def _WaitUntilSync(self):
3923     """Poll with custom rpc for disk sync.
3924
3925     This uses our own step-based rpc call.
3926
3927     """
3928     self.feedback_fn("* wait until resync is done")
3929     all_done = False
3930     while not all_done:
3931       all_done = True
3932       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3933                                             self.nodes_ip,
3934                                             self.instance.disks)
3935       min_percent = 100
3936       for node, nres in result.items():
3937         msg = nres.RemoteFailMsg()
3938         if msg:
3939           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3940                                    (node, msg))
3941         node_done, node_percent = nres.payload
3942         all_done = all_done and node_done
3943         if node_percent is not None:
3944           min_percent = min(min_percent, node_percent)
3945       if not all_done:
3946         if min_percent < 100:
3947           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3948         time.sleep(2)
3949
3950   def _EnsureSecondary(self, node):
3951     """Demote a node to secondary.
3952
3953     """
3954     self.feedback_fn("* switching node %s to secondary mode" % node)
3955
3956     for dev in self.instance.disks:
3957       self.cfg.SetDiskID(dev, node)
3958
3959     result = self.rpc.call_blockdev_close(node, self.instance.name,
3960                                           self.instance.disks)
3961     msg = result.RemoteFailMsg()
3962     if msg:
3963       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3964                                " error %s" % (node, msg))
3965
3966   def _GoStandalone(self):
3967     """Disconnect from the network.
3968
3969     """
3970     self.feedback_fn("* changing into standalone mode")
3971     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3972                                                self.instance.disks)
3973     for node, nres in result.items():
3974       msg = nres.RemoteFailMsg()
3975       if msg:
3976         raise errors.OpExecError("Cannot disconnect disks node %s,"
3977                                  " error %s" % (node, msg))
3978
3979   def _GoReconnect(self, multimaster):
3980     """Reconnect to the network.
3981
3982     """
3983     if multimaster:
3984       msg = "dual-master"
3985     else:
3986       msg = "single-master"
3987     self.feedback_fn("* changing disks into %s mode" % msg)
3988     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3989                                            self.instance.disks,
3990                                            self.instance.name, multimaster)
3991     for node, nres in result.items():
3992       msg = nres.RemoteFailMsg()
3993       if msg:
3994         raise errors.OpExecError("Cannot change disks config on node %s,"
3995                                  " error: %s" % (node, msg))
3996
3997   def _ExecCleanup(self):
3998     """Try to cleanup after a failed migration.
3999
4000     The cleanup is done by:
4001       - check that the instance is running only on one node
4002         (and update the config if needed)
4003       - change disks on its secondary node to secondary
4004       - wait until disks are fully synchronized
4005       - disconnect from the network
4006       - change disks into single-master mode
4007       - wait again until disks are fully synchronized
4008
4009     """
4010     instance = self.instance
4011     target_node = self.target_node
4012     source_node = self.source_node
4013
4014     # check running on only one node
4015     self.feedback_fn("* checking where the instance actually runs"
4016                      " (if this hangs, the hypervisor might be in"
4017                      " a bad state)")
4018     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
4019     for node, result in ins_l.items():
4020       result.Raise()
4021       if not isinstance(result.data, list):
4022         raise errors.OpExecError("Can't contact node '%s'" % node)
4023
4024     runningon_source = instance.name in ins_l[source_node].data
4025     runningon_target = instance.name in ins_l[target_node].data
4026
4027     if runningon_source and runningon_target:
4028       raise errors.OpExecError("Instance seems to be running on two nodes,"
4029                                " or the hypervisor is confused. You will have"
4030                                " to ensure manually that it runs only on one"
4031                                " and restart this operation.")
4032
4033     if not (runningon_source or runningon_target):
4034       raise errors.OpExecError("Instance does not seem to be running at all."
4035                                " In this case, it's safer to repair by"
4036                                " running 'gnt-instance stop' to ensure disk"
4037                                " shutdown, and then restarting it.")
4038
4039     if runningon_target:
4040       # the migration has actually succeeded, we need to update the config
4041       self.feedback_fn("* instance running on secondary node (%s),"
4042                        " updating config" % target_node)
4043       instance.primary_node = target_node
4044       self.cfg.Update(instance)
4045       demoted_node = source_node
4046     else:
4047       self.feedback_fn("* instance confirmed to be running on its"
4048                        " primary node (%s)" % source_node)
4049       demoted_node = target_node
4050
4051     self._EnsureSecondary(demoted_node)
4052     try:
4053       self._WaitUntilSync()
4054     except errors.OpExecError:
4055       # we ignore here errors, since if the device is standalone, it
4056       # won't be able to sync
4057       pass
4058     self._GoStandalone()
4059     self._GoReconnect(False)
4060     self._WaitUntilSync()
4061
4062     self.feedback_fn("* done")
4063
4064   def _RevertDiskStatus(self):
4065     """Try to revert the disk status after a failed migration.
4066
4067     """
4068     target_node = self.target_node
4069     try:
4070       self._EnsureSecondary(target_node)
4071       self._GoStandalone()
4072       self._GoReconnect(False)
4073       self._WaitUntilSync()
4074     except errors.OpExecError, err:
4075       self.LogWarning("Migration failed and I can't reconnect the"
4076                       " drives: error '%s'\n"
4077                       "Please look and recover the instance status" %
4078                       str(err))
4079
4080   def _AbortMigration(self):
4081     """Call the hypervisor code to abort a started migration.
4082
4083     """
4084     instance = self.instance
4085     target_node = self.target_node
4086     migration_info = self.migration_info
4087
4088     abort_result = self.rpc.call_finalize_migration(target_node,
4089                                                     instance,
4090                                                     migration_info,
4091                                                     False)
4092     abort_msg = abort_result.RemoteFailMsg()
4093     if abort_msg:
4094       logging.error("Aborting migration failed on target node %s: %s" %
4095                     (target_node, abort_msg))
4096       # Don't raise an exception here, as we stil have to try to revert the
4097       # disk status, even if this step failed.
4098
4099   def _ExecMigration(self):
4100     """Migrate an instance.
4101
4102     The migrate is done by:
4103       - change the disks into dual-master mode
4104       - wait until disks are fully synchronized again
4105       - migrate the instance
4106       - change disks on the new secondary node (the old primary) to secondary
4107       - wait until disks are fully synchronized
4108       - change disks into single-master mode
4109
4110     """
4111     instance = self.instance
4112     target_node = self.target_node
4113     source_node = self.source_node
4114
4115     self.feedback_fn("* checking disk consistency between source and target")
4116     for dev in instance.disks:
4117       if not _CheckDiskConsistency(self, dev, target_node, False):
4118         raise errors.OpExecError("Disk %s is degraded or not fully"
4119                                  " synchronized on target node,"
4120                                  " aborting migrate." % dev.iv_name)
4121
4122     # First get the migration information from the remote node
4123     result = self.rpc.call_migration_info(source_node, instance)
4124     msg = result.RemoteFailMsg()
4125     if msg:
4126       log_err = ("Failed fetching source migration information from %s: %s" %
4127                  (source_node, msg))
4128       logging.error(log_err)
4129       raise errors.OpExecError(log_err)
4130
4131     self.migration_info = migration_info = result.payload
4132
4133     # Then switch the disks to master/master mode
4134     self._EnsureSecondary(target_node)
4135     self._GoStandalone()
4136     self._GoReconnect(True)
4137     self._WaitUntilSync()
4138
4139     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4140     result = self.rpc.call_accept_instance(target_node,
4141                                            instance,
4142                                            migration_info,
4143                                            self.nodes_ip[target_node])
4144
4145     msg = result.RemoteFailMsg()
4146     if msg:
4147       logging.error("Instance pre-migration failed, trying to revert"
4148                     " disk status: %s", msg)
4149       self._AbortMigration()
4150       self._RevertDiskStatus()
4151       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4152                                (instance.name, msg))
4153
4154     self.feedback_fn("* migrating instance to %s" % target_node)
4155     time.sleep(10)
4156     result = self.rpc.call_instance_migrate(source_node, instance,
4157                                             self.nodes_ip[target_node],
4158                                             self.op.live)
4159     msg = result.RemoteFailMsg()
4160     if msg:
4161       logging.error("Instance migration failed, trying to revert"
4162                     " disk status: %s", msg)
4163       self._AbortMigration()
4164       self._RevertDiskStatus()
4165       raise errors.OpExecError("Could not migrate instance %s: %s" %
4166                                (instance.name, msg))
4167     time.sleep(10)
4168
4169     instance.primary_node = target_node
4170     # distribute new instance config to the other nodes
4171     self.cfg.Update(instance)
4172
4173     result = self.rpc.call_finalize_migration(target_node,
4174                                               instance,
4175                                               migration_info,
4176                                               True)
4177     msg = result.RemoteFailMsg()
4178     if msg:
4179       logging.error("Instance migration succeeded, but finalization failed:"
4180                     " %s" % msg)
4181       raise errors.OpExecError("Could not finalize instance migration: %s" %
4182                                msg)
4183
4184     self._EnsureSecondary(source_node)
4185     self._WaitUntilSync()
4186     self._GoStandalone()
4187     self._GoReconnect(False)
4188     self._WaitUntilSync()
4189
4190     self.feedback_fn("* done")
4191
4192   def Exec(self, feedback_fn):
4193     """Perform the migration.
4194
4195     """
4196     self.feedback_fn = feedback_fn
4197
4198     self.source_node = self.instance.primary_node
4199     self.target_node = self.instance.secondary_nodes[0]
4200     self.all_nodes = [self.source_node, self.target_node]
4201     self.nodes_ip = {
4202       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4203       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4204       }
4205     if self.op.cleanup:
4206       return self._ExecCleanup()
4207     else:
4208       return self._ExecMigration()
4209
4210
4211 def _CreateBlockDev(lu, node, instance, device, force_create,
4212                     info, force_open):
4213   """Create a tree of block devices on a given node.
4214
4215   If this device type has to be created on secondaries, create it and
4216   all its children.
4217
4218   If not, just recurse to children keeping the same 'force' value.
4219
4220   @param lu: the lu on whose behalf we execute
4221   @param node: the node on which to create the device
4222   @type instance: L{objects.Instance}
4223   @param instance: the instance which owns the device
4224   @type device: L{objects.Disk}
4225   @param device: the device to create
4226   @type force_create: boolean
4227   @param force_create: whether to force creation of this device; this
4228       will be change to True whenever we find a device which has
4229       CreateOnSecondary() attribute
4230   @param info: the extra 'metadata' we should attach to the device
4231       (this will be represented as a LVM tag)
4232   @type force_open: boolean
4233   @param force_open: this parameter will be passes to the
4234       L{backend.BlockdevCreate} function where it specifies
4235       whether we run on primary or not, and it affects both
4236       the child assembly and the device own Open() execution
4237
4238   """
4239   if device.CreateOnSecondary():
4240     force_create = True
4241
4242   if device.children:
4243     for child in device.children:
4244       _CreateBlockDev(lu, node, instance, child, force_create,
4245                       info, force_open)
4246
4247   if not force_create:
4248     return
4249
4250   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4251
4252
4253 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4254   """Create a single block device on a given node.
4255
4256   This will not recurse over children of the device, so they must be
4257   created in advance.
4258
4259   @param lu: the lu on whose behalf we execute
4260   @param node: the node on which to create the device
4261   @type instance: L{objects.Instance}
4262   @param instance: the instance which owns the device
4263   @type device: L{objects.Disk}
4264   @param device: the device to create
4265   @param info: the extra 'metadata' we should attach to the device
4266       (this will be represented as a LVM tag)
4267   @type force_open: boolean
4268   @param force_open: this parameter will be passes to the
4269       L{backend.BlockdevCreate} function where it specifies
4270       whether we run on primary or not, and it affects both
4271       the child assembly and the device own Open() execution
4272
4273   """
4274   lu.cfg.SetDiskID(device, node)
4275   result = lu.rpc.call_blockdev_create(node, device, device.size,
4276                                        instance.name, force_open, info)
4277   msg = result.RemoteFailMsg()
4278   if msg:
4279     raise errors.OpExecError("Can't create block device %s on"
4280                              " node %s for instance %s: %s" %
4281                              (device, node, instance.name, msg))
4282   if device.physical_id is None:
4283     device.physical_id = result.payload
4284
4285
4286 def _GenerateUniqueNames(lu, exts):
4287   """Generate a suitable LV name.
4288
4289   This will generate a logical volume name for the given instance.
4290
4291   """
4292   results = []
4293   for val in exts:
4294     new_id = lu.cfg.GenerateUniqueID()
4295     results.append("%s%s" % (new_id, val))
4296   return results
4297
4298
4299 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4300                          p_minor, s_minor):
4301   """Generate a drbd8 device complete with its children.
4302
4303   """
4304   port = lu.cfg.AllocatePort()
4305   vgname = lu.cfg.GetVGName()
4306   shared_secret = lu.cfg.GenerateDRBDSecret()
4307   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4308                           logical_id=(vgname, names[0]))
4309   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4310                           logical_id=(vgname, names[1]))
4311   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4312                           logical_id=(primary, secondary, port,
4313                                       p_minor, s_minor,
4314                                       shared_secret),
4315                           children=[dev_data, dev_meta],
4316                           iv_name=iv_name)
4317   return drbd_dev
4318
4319
4320 def _GenerateDiskTemplate(lu, template_name,
4321                           instance_name, primary_node,
4322                           secondary_nodes, disk_info,
4323                           file_storage_dir, file_driver,
4324                           base_index):
4325   """Generate the entire disk layout for a given template type.
4326
4327   """
4328   #TODO: compute space requirements
4329
4330   vgname = lu.cfg.GetVGName()
4331   disk_count = len(disk_info)
4332   disks = []
4333   if template_name == constants.DT_DISKLESS:
4334     pass
4335   elif template_name == constants.DT_PLAIN:
4336     if len(secondary_nodes) != 0:
4337       raise errors.ProgrammerError("Wrong template configuration")
4338
4339     names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4340                                       for i in range(disk_count)])
4341     for idx, disk in enumerate(disk_info):
4342       disk_index = idx + base_index
4343       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4344                               logical_id=(vgname, names[idx]),
4345                               iv_name="disk/%d" % disk_index,
4346                               mode=disk["mode"])
4347       disks.append(disk_dev)
4348   elif template_name == constants.DT_DRBD8:
4349     if len(secondary_nodes) != 1:
4350       raise errors.ProgrammerError("Wrong template configuration")
4351     remote_node = secondary_nodes[0]
4352     minors = lu.cfg.AllocateDRBDMinor(
4353       [primary_node, remote_node] * len(disk_info), instance_name)
4354
4355     names = []
4356     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
4357                                                for i in range(disk_count)]):
4358       names.append(lv_prefix + "_data")
4359       names.append(lv_prefix + "_meta")
4360     for idx, disk in enumerate(disk_info):
4361       disk_index = idx + base_index
4362       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4363                                       disk["size"], names[idx*2:idx*2+2],
4364                                       "disk/%d" % disk_index,
4365                                       minors[idx*2], minors[idx*2+1])
4366       disk_dev.mode = disk["mode"]
4367       disks.append(disk_dev)
4368   elif template_name == constants.DT_FILE:
4369     if len(secondary_nodes) != 0:
4370       raise errors.ProgrammerError("Wrong template configuration")
4371
4372     for idx, disk in enumerate(disk_info):
4373       disk_index = idx + base_index
4374       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4375                               iv_name="disk/%d" % disk_index,
4376                               logical_id=(file_driver,
4377                                           "%s/disk%d" % (file_storage_dir,
4378                                                          disk_index)),
4379                               mode=disk["mode"])
4380       disks.append(disk_dev)
4381   else:
4382     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4383   return disks
4384
4385
4386 def _GetInstanceInfoText(instance):
4387   """Compute that text that should be added to the disk's metadata.
4388
4389   """
4390   return "originstname+%s" % instance.name
4391
4392
4393 def _CreateDisks(lu, instance):
4394   """Create all disks for an instance.
4395
4396   This abstracts away some work from AddInstance.
4397
4398   @type lu: L{LogicalUnit}
4399   @param lu: the logical unit on whose behalf we execute
4400   @type instance: L{objects.Instance}
4401   @param instance: the instance whose disks we should create
4402   @rtype: boolean
4403   @return: the success of the creation
4404
4405   """
4406   info = _GetInstanceInfoText(instance)
4407   pnode = instance.primary_node
4408
4409   if instance.disk_template == constants.DT_FILE:
4410     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4411     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4412
4413     if result.failed or not result.data:
4414       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4415
4416     if not result.data[0]:
4417       raise errors.OpExecError("Failed to create directory '%s'" %
4418                                file_storage_dir)
4419
4420   # Note: this needs to be kept in sync with adding of disks in
4421   # LUSetInstanceParams
4422   for device in instance.disks:
4423     logging.info("Creating volume %s for instance %s",
4424                  device.iv_name, instance.name)
4425     #HARDCODE
4426     for node in instance.all_nodes:
4427       f_create = node == pnode
4428       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4429
4430
4431 def _RemoveDisks(lu, instance):
4432   """Remove all disks for an instance.
4433
4434   This abstracts away some work from `AddInstance()` and
4435   `RemoveInstance()`. Note that in case some of the devices couldn't
4436   be removed, the removal will continue with the other ones (compare
4437   with `_CreateDisks()`).
4438
4439   @type lu: L{LogicalUnit}
4440   @param lu: the logical unit on whose behalf we execute
4441   @type instance: L{objects.Instance}
4442   @param instance: the instance whose disks we should remove
4443   @rtype: boolean
4444   @return: the success of the removal
4445
4446   """
4447   logging.info("Removing block devices for instance %s", instance.name)
4448
4449   all_result = True
4450   for device in instance.disks:
4451     for node, disk in device.ComputeNodeTree(instance.primary_node):
4452       lu.cfg.SetDiskID(disk, node)
4453       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4454       if msg:
4455         lu.LogWarning("Could not remove block device %s on node %s,"
4456                       " continuing anyway: %s", device.iv_name, node, msg)
4457         all_result = False
4458
4459   if instance.disk_template == constants.DT_FILE:
4460     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4461     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4462                                                  file_storage_dir)
4463     if result.failed or not result.data:
4464       logging.error("Could not remove directory '%s'", file_storage_dir)
4465       all_result = False
4466
4467   return all_result
4468
4469
4470 def _ComputeDiskSize(disk_template, disks):
4471   """Compute disk size requirements in the volume group
4472
4473   """
4474   # Required free disk space as a function of disk and swap space
4475   req_size_dict = {
4476     constants.DT_DISKLESS: None,
4477     constants.DT_PLAIN: sum(d["size"] for d in disks),
4478     # 128 MB are added for drbd metadata for each disk
4479     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4480     constants.DT_FILE: None,
4481   }
4482
4483   if disk_template not in req_size_dict:
4484     raise errors.ProgrammerError("Disk template '%s' size requirement"
4485                                  " is unknown" %  disk_template)
4486
4487   return req_size_dict[disk_template]
4488
4489
4490 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4491   """Hypervisor parameter validation.
4492
4493   This function abstract the hypervisor parameter validation to be
4494   used in both instance create and instance modify.
4495
4496   @type lu: L{LogicalUnit}
4497   @param lu: the logical unit for which we check
4498   @type nodenames: list
4499   @param nodenames: the list of nodes on which we should check
4500   @type hvname: string
4501   @param hvname: the name of the hypervisor we should use
4502   @type hvparams: dict
4503   @param hvparams: the parameters which we need to check
4504   @raise errors.OpPrereqError: if the parameters are not valid
4505
4506   """
4507   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4508                                                   hvname,
4509                                                   hvparams)
4510   for node in nodenames:
4511     info = hvinfo[node]
4512     if info.offline:
4513       continue
4514     msg = info.RemoteFailMsg()
4515     if msg:
4516       raise errors.OpPrereqError("Hypervisor parameter validation"
4517                                  " failed on node %s: %s" % (node, msg))
4518
4519
4520 class LUCreateInstance(LogicalUnit):
4521   """Create an instance.
4522
4523   """
4524   HPATH = "instance-add"
4525   HTYPE = constants.HTYPE_INSTANCE
4526   _OP_REQP = ["instance_name", "disks", "disk_template",
4527               "mode", "start",
4528               "wait_for_sync", "ip_check", "nics",
4529               "hvparams", "beparams"]
4530   REQ_BGL = False
4531
4532   def _ExpandNode(self, node):
4533     """Expands and checks one node name.
4534
4535     """
4536     node_full = self.cfg.ExpandNodeName(node)
4537     if node_full is None:
4538       raise errors.OpPrereqError("Unknown node %s" % node)
4539     return node_full
4540
4541   def ExpandNames(self):
4542     """ExpandNames for CreateInstance.
4543
4544     Figure out the right locks for instance creation.
4545
4546     """
4547     self.needed_locks = {}
4548
4549     # set optional parameters to none if they don't exist
4550     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4551       if not hasattr(self.op, attr):
4552         setattr(self.op, attr, None)
4553
4554     # cheap checks, mostly valid constants given
4555
4556     # verify creation mode
4557     if self.op.mode not in (constants.INSTANCE_CREATE,
4558                             constants.INSTANCE_IMPORT):
4559       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4560                                  self.op.mode)
4561
4562     # disk template and mirror node verification
4563     if self.op.disk_template not in constants.DISK_TEMPLATES:
4564       raise errors.OpPrereqError("Invalid disk template name")
4565
4566     if self.op.hypervisor is None:
4567       self.op.hypervisor = self.cfg.GetHypervisorType()
4568
4569     cluster = self.cfg.GetClusterInfo()
4570     enabled_hvs = cluster.enabled_hypervisors
4571     if self.op.hypervisor not in enabled_hvs:
4572       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4573                                  " cluster (%s)" % (self.op.hypervisor,
4574                                   ",".join(enabled_hvs)))
4575
4576     # check hypervisor parameter syntax (locally)
4577     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4578     filled_hvp = cluster.FillDict(cluster.hvparams[self.op.hypervisor],
4579                                   self.op.hvparams)
4580     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4581     hv_type.CheckParameterSyntax(filled_hvp)
4582     self.hv_full = filled_hvp
4583
4584     # fill and remember the beparams dict
4585     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4586     self.be_full = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
4587                                     self.op.beparams)
4588
4589     #### instance parameters check
4590
4591     # instance name verification
4592     hostname1 = utils.HostInfo(self.op.instance_name)
4593     self.op.instance_name = instance_name = hostname1.name
4594
4595     # this is just a preventive check, but someone might still add this
4596     # instance in the meantime, and creation will fail at lock-add time
4597     if instance_name in self.cfg.GetInstanceList():
4598       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4599                                  instance_name)
4600
4601     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4602
4603     # NIC buildup
4604     self.nics = []
4605     for nic in self.op.nics:
4606       # ip validity checks
4607       ip = nic.get("ip", None)
4608       if ip is None or ip.lower() == "none":
4609         nic_ip = None
4610       elif ip.lower() == constants.VALUE_AUTO:
4611         nic_ip = hostname1.ip
4612       else:
4613         if not utils.IsValidIP(ip):
4614           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4615                                      " like a valid IP" % ip)
4616         nic_ip = ip
4617
4618       # MAC address verification
4619       mac = nic.get("mac", constants.VALUE_AUTO)
4620       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4621         if not utils.IsValidMac(mac.lower()):
4622           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4623                                      mac)
4624         else:
4625           # or validate/reserve the current one
4626           if self.cfg.IsMacInUse(mac):
4627             raise errors.OpPrereqError("MAC address %s already in use"
4628                                        " in cluster" % mac)
4629
4630       # bridge verification
4631       bridge = nic.get("bridge", None)
4632       if bridge is None:
4633         bridge = self.cfg.GetDefBridge()
4634       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, bridge=bridge))
4635
4636     # disk checks/pre-build
4637     self.disks = []
4638     for disk in self.op.disks:
4639       mode = disk.get("mode", constants.DISK_RDWR)
4640       if mode not in constants.DISK_ACCESS_SET:
4641         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4642                                    mode)
4643       size = disk.get("size", None)
4644       if size is None:
4645         raise errors.OpPrereqError("Missing disk size")
4646       try:
4647         size = int(size)
4648       except ValueError:
4649         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4650       self.disks.append({"size": size, "mode": mode})
4651
4652     # used in CheckPrereq for ip ping check
4653     self.check_ip = hostname1.ip
4654
4655     # file storage checks
4656     if (self.op.file_driver and
4657         not self.op.file_driver in constants.FILE_DRIVER):
4658       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4659                                  self.op.file_driver)
4660
4661     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4662       raise errors.OpPrereqError("File storage directory path not absolute")
4663
4664     ### Node/iallocator related checks
4665     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4666       raise errors.OpPrereqError("One and only one of iallocator and primary"
4667                                  " node must be given")
4668
4669     if self.op.iallocator:
4670       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4671     else:
4672       self.op.pnode = self._ExpandNode(self.op.pnode)
4673       nodelist = [self.op.pnode]
4674       if self.op.snode is not None:
4675         self.op.snode = self._ExpandNode(self.op.snode)
4676         nodelist.append(self.op.snode)
4677       self.needed_locks[locking.LEVEL_NODE] = nodelist
4678
4679     # in case of import lock the source node too
4680     if self.op.mode == constants.INSTANCE_IMPORT:
4681       src_node = getattr(self.op, "src_node", None)
4682       src_path = getattr(self.op, "src_path", None)
4683
4684       if src_path is None:
4685         self.op.src_path = src_path = self.op.instance_name
4686
4687       if src_node is None:
4688         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4689         self.op.src_node = None
4690         if os.path.isabs(src_path):
4691           raise errors.OpPrereqError("Importing an instance from an absolute"
4692                                      " path requires a source node option.")
4693       else:
4694         self.op.src_node = src_node = self._ExpandNode(src_node)
4695         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4696           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4697         if not os.path.isabs(src_path):
4698           self.op.src_path = src_path = \
4699             os.path.join(constants.EXPORT_DIR, src_path)
4700
4701     else: # INSTANCE_CREATE
4702       if getattr(self.op, "os_type", None) is None:
4703         raise errors.OpPrereqError("No guest OS specified")
4704
4705   def _RunAllocator(self):
4706     """Run the allocator based on input opcode.
4707
4708     """
4709     nics = [n.ToDict() for n in self.nics]
4710     ial = IAllocator(self,
4711                      mode=constants.IALLOCATOR_MODE_ALLOC,
4712                      name=self.op.instance_name,
4713                      disk_template=self.op.disk_template,
4714                      tags=[],
4715                      os=self.op.os_type,
4716                      vcpus=self.be_full[constants.BE_VCPUS],
4717                      mem_size=self.be_full[constants.BE_MEMORY],
4718                      disks=self.disks,
4719                      nics=nics,
4720                      hypervisor=self.op.hypervisor,
4721                      )
4722
4723     ial.Run(self.op.iallocator)
4724
4725     if not ial.success:
4726       raise errors.OpPrereqError("Can't compute nodes using"
4727                                  " iallocator '%s': %s" % (self.op.iallocator,
4728                                                            ial.info))
4729     if len(ial.nodes) != ial.required_nodes:
4730       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4731                                  " of nodes (%s), required %s" %
4732                                  (self.op.iallocator, len(ial.nodes),
4733                                   ial.required_nodes))
4734     self.op.pnode = ial.nodes[0]
4735     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4736                  self.op.instance_name, self.op.iallocator,
4737                  ", ".join(ial.nodes))
4738     if ial.required_nodes == 2:
4739       self.op.snode = ial.nodes[1]
4740
4741   def BuildHooksEnv(self):
4742     """Build hooks env.
4743
4744     This runs on master, primary and secondary nodes of the instance.
4745
4746     """
4747     env = {
4748       "ADD_MODE": self.op.mode,
4749       }
4750     if self.op.mode == constants.INSTANCE_IMPORT:
4751       env["SRC_NODE"] = self.op.src_node
4752       env["SRC_PATH"] = self.op.src_path
4753       env["SRC_IMAGES"] = self.src_images
4754
4755     env.update(_BuildInstanceHookEnv(
4756       name=self.op.instance_name,
4757       primary_node=self.op.pnode,
4758       secondary_nodes=self.secondaries,
4759       status=self.op.start,
4760       os_type=self.op.os_type,
4761       memory=self.be_full[constants.BE_MEMORY],
4762       vcpus=self.be_full[constants.BE_VCPUS],
4763       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
4764       disk_template=self.op.disk_template,
4765       disks=[(d["size"], d["mode"]) for d in self.disks],
4766       bep=self.be_full,
4767       hvp=self.hv_full,
4768       hypervisor_name=self.op.hypervisor,
4769     ))
4770
4771     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4772           self.secondaries)
4773     return env, nl, nl
4774
4775
4776   def CheckPrereq(self):
4777     """Check prerequisites.
4778
4779     """
4780     if (not self.cfg.GetVGName() and
4781         self.op.disk_template not in constants.DTS_NOT_LVM):
4782       raise errors.OpPrereqError("Cluster does not support lvm-based"
4783                                  " instances")
4784
4785     if self.op.mode == constants.INSTANCE_IMPORT:
4786       src_node = self.op.src_node
4787       src_path = self.op.src_path
4788
4789       if src_node is None:
4790         exp_list = self.rpc.call_export_list(
4791           self.acquired_locks[locking.LEVEL_NODE])
4792         found = False
4793         for node in exp_list:
4794           if not exp_list[node].failed and src_path in exp_list[node].data:
4795             found = True
4796             self.op.src_node = src_node = node
4797             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4798                                                        src_path)
4799             break
4800         if not found:
4801           raise errors.OpPrereqError("No export found for relative path %s" %
4802                                       src_path)
4803
4804       _CheckNodeOnline(self, src_node)
4805       result = self.rpc.call_export_info(src_node, src_path)
4806       result.Raise()
4807       if not result.data:
4808         raise errors.OpPrereqError("No export found in dir %s" % src_path)
4809
4810       export_info = result.data
4811       if not export_info.has_section(constants.INISECT_EXP):
4812         raise errors.ProgrammerError("Corrupted export config")
4813
4814       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4815       if (int(ei_version) != constants.EXPORT_VERSION):
4816         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4817                                    (ei_version, constants.EXPORT_VERSION))
4818
4819       # Check that the new instance doesn't have less disks than the export
4820       instance_disks = len(self.disks)
4821       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4822       if instance_disks < export_disks:
4823         raise errors.OpPrereqError("Not enough disks to import."
4824                                    " (instance: %d, export: %d)" %
4825                                    (instance_disks, export_disks))
4826
4827       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4828       disk_images = []
4829       for idx in range(export_disks):
4830         option = 'disk%d_dump' % idx
4831         if export_info.has_option(constants.INISECT_INS, option):
4832           # FIXME: are the old os-es, disk sizes, etc. useful?
4833           export_name = export_info.get(constants.INISECT_INS, option)
4834           image = os.path.join(src_path, export_name)
4835           disk_images.append(image)
4836         else:
4837           disk_images.append(False)
4838
4839       self.src_images = disk_images
4840
4841       old_name = export_info.get(constants.INISECT_INS, 'name')
4842       # FIXME: int() here could throw a ValueError on broken exports
4843       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4844       if self.op.instance_name == old_name:
4845         for idx, nic in enumerate(self.nics):
4846           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4847             nic_mac_ini = 'nic%d_mac' % idx
4848             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4849
4850     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4851     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4852     if self.op.start and not self.op.ip_check:
4853       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4854                                  " adding an instance in start mode")
4855
4856     if self.op.ip_check:
4857       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4858         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4859                                    (self.check_ip, self.op.instance_name))
4860
4861     #### mac address generation
4862     # By generating here the mac address both the allocator and the hooks get
4863     # the real final mac address rather than the 'auto' or 'generate' value.
4864     # There is a race condition between the generation and the instance object
4865     # creation, which means that we know the mac is valid now, but we're not
4866     # sure it will be when we actually add the instance. If things go bad
4867     # adding the instance will abort because of a duplicate mac, and the
4868     # creation job will fail.
4869     for nic in self.nics:
4870       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4871         nic.mac = self.cfg.GenerateMAC()
4872
4873     #### allocator run
4874
4875     if self.op.iallocator is not None:
4876       self._RunAllocator()
4877
4878     #### node related checks
4879
4880     # check primary node
4881     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4882     assert self.pnode is not None, \
4883       "Cannot retrieve locked node %s" % self.op.pnode
4884     if pnode.offline:
4885       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4886                                  pnode.name)
4887     if pnode.drained:
4888       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4889                                  pnode.name)
4890
4891     self.secondaries = []
4892
4893     # mirror node verification
4894     if self.op.disk_template in constants.DTS_NET_MIRROR:
4895       if self.op.snode is None:
4896         raise errors.OpPrereqError("The networked disk templates need"
4897                                    " a mirror node")
4898       if self.op.snode == pnode.name:
4899         raise errors.OpPrereqError("The secondary node cannot be"
4900                                    " the primary node.")
4901       _CheckNodeOnline(self, self.op.snode)
4902       _CheckNodeNotDrained(self, self.op.snode)
4903       self.secondaries.append(self.op.snode)
4904
4905     nodenames = [pnode.name] + self.secondaries
4906
4907     req_size = _ComputeDiskSize(self.op.disk_template,
4908                                 self.disks)
4909
4910     # Check lv size requirements
4911     if req_size is not None:
4912       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4913                                          self.op.hypervisor)
4914       for node in nodenames:
4915         info = nodeinfo[node]
4916         info.Raise()
4917         info = info.data
4918         if not info:
4919           raise errors.OpPrereqError("Cannot get current information"
4920                                      " from node '%s'" % node)
4921         vg_free = info.get('vg_free', None)
4922         if not isinstance(vg_free, int):
4923           raise errors.OpPrereqError("Can't compute free disk space on"
4924                                      " node %s" % node)
4925         if req_size > info['vg_free']:
4926           raise errors.OpPrereqError("Not enough disk space on target node %s."
4927                                      " %d MB available, %d MB required" %
4928                                      (node, info['vg_free'], req_size))
4929
4930     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4931
4932     # os verification
4933     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4934     result.Raise()
4935     if not isinstance(result.data, objects.OS) or not result.data:
4936       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4937                                  " primary node"  % self.op.os_type)
4938
4939     # bridge check on primary node
4940     bridges = [n.bridge for n in self.nics]
4941     result = self.rpc.call_bridges_exist(self.pnode.name, bridges)
4942     result.Raise()
4943     if not result.data:
4944       raise errors.OpPrereqError("One of the target bridges '%s' does not"
4945                                  " exist on destination node '%s'" %
4946                                  (",".join(bridges), pnode.name))
4947
4948     # memory check on primary node
4949     if self.op.start:
4950       _CheckNodeFreeMemory(self, self.pnode.name,
4951                            "creating instance %s" % self.op.instance_name,
4952                            self.be_full[constants.BE_MEMORY],
4953                            self.op.hypervisor)
4954
4955   def Exec(self, feedback_fn):
4956     """Create and add the instance to the cluster.
4957
4958     """
4959     instance = self.op.instance_name
4960     pnode_name = self.pnode.name
4961
4962     ht_kind = self.op.hypervisor
4963     if ht_kind in constants.HTS_REQ_PORT:
4964       network_port = self.cfg.AllocatePort()
4965     else:
4966       network_port = None
4967
4968     ##if self.op.vnc_bind_address is None:
4969     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4970
4971     # this is needed because os.path.join does not accept None arguments
4972     if self.op.file_storage_dir is None:
4973       string_file_storage_dir = ""
4974     else:
4975       string_file_storage_dir = self.op.file_storage_dir
4976
4977     # build the full file storage dir path
4978     file_storage_dir = os.path.normpath(os.path.join(
4979                                         self.cfg.GetFileStorageDir(),
4980                                         string_file_storage_dir, instance))
4981
4982
4983     disks = _GenerateDiskTemplate(self,
4984                                   self.op.disk_template,
4985                                   instance, pnode_name,
4986                                   self.secondaries,
4987                                   self.disks,
4988                                   file_storage_dir,
4989                                   self.op.file_driver,
4990                                   0)
4991
4992     iobj = objects.Instance(name=instance, os=self.op.os_type,
4993                             primary_node=pnode_name,
4994                             nics=self.nics, disks=disks,
4995                             disk_template=self.op.disk_template,
4996                             admin_up=False,
4997                             network_port=network_port,
4998                             beparams=self.op.beparams,
4999                             hvparams=self.op.hvparams,
5000                             hypervisor=self.op.hypervisor,
5001                             )
5002
5003     feedback_fn("* creating instance disks...")
5004     try:
5005       _CreateDisks(self, iobj)
5006     except errors.OpExecError:
5007       self.LogWarning("Device creation failed, reverting...")
5008       try:
5009         _RemoveDisks(self, iobj)
5010       finally:
5011         self.cfg.ReleaseDRBDMinors(instance)
5012         raise
5013
5014     feedback_fn("adding instance %s to cluster config" % instance)
5015
5016     self.cfg.AddInstance(iobj)
5017     # Declare that we don't want to remove the instance lock anymore, as we've
5018     # added the instance to the config
5019     del self.remove_locks[locking.LEVEL_INSTANCE]
5020     # Unlock all the nodes
5021     if self.op.mode == constants.INSTANCE_IMPORT:
5022       nodes_keep = [self.op.src_node]
5023       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
5024                        if node != self.op.src_node]
5025       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
5026       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
5027     else:
5028       self.context.glm.release(locking.LEVEL_NODE)
5029       del self.acquired_locks[locking.LEVEL_NODE]
5030
5031     if self.op.wait_for_sync:
5032       disk_abort = not _WaitForSync(self, iobj)
5033     elif iobj.disk_template in constants.DTS_NET_MIRROR:
5034       # make sure the disks are not degraded (still sync-ing is ok)
5035       time.sleep(15)
5036       feedback_fn("* checking mirrors status")
5037       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
5038     else:
5039       disk_abort = False
5040
5041     if disk_abort:
5042       _RemoveDisks(self, iobj)
5043       self.cfg.RemoveInstance(iobj.name)
5044       # Make sure the instance lock gets removed
5045       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
5046       raise errors.OpExecError("There are some degraded disks for"
5047                                " this instance")
5048
5049     feedback_fn("creating os for instance %s on node %s" %
5050                 (instance, pnode_name))
5051
5052     if iobj.disk_template != constants.DT_DISKLESS:
5053       if self.op.mode == constants.INSTANCE_CREATE:
5054         feedback_fn("* running the instance OS create scripts...")
5055         result = self.rpc.call_instance_os_add(pnode_name, iobj)
5056         msg = result.RemoteFailMsg()
5057         if msg:
5058           raise errors.OpExecError("Could not add os for instance %s"
5059                                    " on node %s: %s" %
5060                                    (instance, pnode_name, msg))
5061
5062       elif self.op.mode == constants.INSTANCE_IMPORT:
5063         feedback_fn("* running the instance OS import scripts...")
5064         src_node = self.op.src_node
5065         src_images = self.src_images
5066         cluster_name = self.cfg.GetClusterName()
5067         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
5068                                                          src_node, src_images,
5069                                                          cluster_name)
5070         import_result.Raise()
5071         for idx, result in enumerate(import_result.data):
5072           if not result:
5073             self.LogWarning("Could not import the image %s for instance"
5074                             " %s, disk %d, on node %s" %
5075                             (src_images[idx], instance, idx, pnode_name))
5076       else:
5077         # also checked in the prereq part
5078         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
5079                                      % self.op.mode)
5080
5081     if self.op.start:
5082       iobj.admin_up = True
5083       self.cfg.Update(iobj)
5084       logging.info("Starting instance %s on node %s", instance, pnode_name)
5085       feedback_fn("* starting instance...")
5086       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
5087       msg = result.RemoteFailMsg()
5088       if msg:
5089         raise errors.OpExecError("Could not start instance: %s" % msg)
5090
5091
5092 class LUConnectConsole(NoHooksLU):
5093   """Connect to an instance's console.
5094
5095   This is somewhat special in that it returns the command line that
5096   you need to run on the master node in order to connect to the
5097   console.
5098
5099   """
5100   _OP_REQP = ["instance_name"]
5101   REQ_BGL = False
5102
5103   def ExpandNames(self):
5104     self._ExpandAndLockInstance()
5105
5106   def CheckPrereq(self):
5107     """Check prerequisites.
5108
5109     This checks that the instance is in the cluster.
5110
5111     """
5112     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5113     assert self.instance is not None, \
5114       "Cannot retrieve locked instance %s" % self.op.instance_name
5115     _CheckNodeOnline(self, self.instance.primary_node)
5116
5117   def Exec(self, feedback_fn):
5118     """Connect to the console of an instance
5119
5120     """
5121     instance = self.instance
5122     node = instance.primary_node
5123
5124     node_insts = self.rpc.call_instance_list([node],
5125                                              [instance.hypervisor])[node]
5126     node_insts.Raise()
5127
5128     if instance.name not in node_insts.data:
5129       raise errors.OpExecError("Instance %s is not running." % instance.name)
5130
5131     logging.debug("Connecting to console of %s on %s", instance.name, node)
5132
5133     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5134     cluster = self.cfg.GetClusterInfo()
5135     # beparams and hvparams are passed separately, to avoid editing the
5136     # instance and then saving the defaults in the instance itself.
5137     hvparams = cluster.FillHV(instance)
5138     beparams = cluster.FillBE(instance)
5139     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5140
5141     # build ssh cmdline
5142     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5143
5144
5145 class LUReplaceDisks(LogicalUnit):
5146   """Replace the disks of an instance.
5147
5148   """
5149   HPATH = "mirrors-replace"
5150   HTYPE = constants.HTYPE_INSTANCE
5151   _OP_REQP = ["instance_name", "mode", "disks"]
5152   REQ_BGL = False
5153
5154   def CheckArguments(self):
5155     if not hasattr(self.op, "remote_node"):
5156       self.op.remote_node = None
5157     if not hasattr(self.op, "iallocator"):
5158       self.op.iallocator = None
5159
5160     # check for valid parameter combination
5161     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5162     if self.op.mode == constants.REPLACE_DISK_CHG:
5163       if cnt == 2:
5164         raise errors.OpPrereqError("When changing the secondary either an"
5165                                    " iallocator script must be used or the"
5166                                    " new node given")
5167       elif cnt == 0:
5168         raise errors.OpPrereqError("Give either the iallocator or the new"
5169                                    " secondary, not both")
5170     else: # not replacing the secondary
5171       if cnt != 2:
5172         raise errors.OpPrereqError("The iallocator and new node options can"
5173                                    " be used only when changing the"
5174                                    " secondary node")
5175
5176   def ExpandNames(self):
5177     self._ExpandAndLockInstance()
5178
5179     if self.op.iallocator is not None:
5180       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5181     elif self.op.remote_node is not None:
5182       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5183       if remote_node is None:
5184         raise errors.OpPrereqError("Node '%s' not known" %
5185                                    self.op.remote_node)
5186       self.op.remote_node = remote_node
5187       # Warning: do not remove the locking of the new secondary here
5188       # unless DRBD8.AddChildren is changed to work in parallel;
5189       # currently it doesn't since parallel invocations of
5190       # FindUnusedMinor will conflict
5191       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5192       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5193     else:
5194       self.needed_locks[locking.LEVEL_NODE] = []
5195       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5196
5197   def DeclareLocks(self, level):
5198     # If we're not already locking all nodes in the set we have to declare the
5199     # instance's primary/secondary nodes.
5200     if (level == locking.LEVEL_NODE and
5201         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5202       self._LockInstancesNodes()
5203
5204   def _RunAllocator(self):
5205     """Compute a new secondary node using an IAllocator.
5206
5207     """
5208     ial = IAllocator(self,
5209                      mode=constants.IALLOCATOR_MODE_RELOC,
5210                      name=self.op.instance_name,
5211                      relocate_from=[self.sec_node])
5212
5213     ial.Run(self.op.iallocator)
5214
5215     if not ial.success:
5216       raise errors.OpPrereqError("Can't compute nodes using"
5217                                  " iallocator '%s': %s" % (self.op.iallocator,
5218                                                            ial.info))
5219     if len(ial.nodes) != ial.required_nodes:
5220       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5221                                  " of nodes (%s), required %s" %
5222                                  (len(ial.nodes), ial.required_nodes))
5223     self.op.remote_node = ial.nodes[0]
5224     self.LogInfo("Selected new secondary for the instance: %s",
5225                  self.op.remote_node)
5226
5227   def BuildHooksEnv(self):
5228     """Build hooks env.
5229
5230     This runs on the master, the primary and all the secondaries.
5231
5232     """
5233     env = {
5234       "MODE": self.op.mode,
5235       "NEW_SECONDARY": self.op.remote_node,
5236       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5237       }
5238     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5239     nl = [
5240       self.cfg.GetMasterNode(),
5241       self.instance.primary_node,
5242       ]
5243     if self.op.remote_node is not None:
5244       nl.append(self.op.remote_node)
5245     return env, nl, nl
5246
5247   def CheckPrereq(self):
5248     """Check prerequisites.
5249
5250     This checks that the instance is in the cluster.
5251
5252     """
5253     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5254     assert instance is not None, \
5255       "Cannot retrieve locked instance %s" % self.op.instance_name
5256     self.instance = instance
5257
5258     if instance.disk_template != constants.DT_DRBD8:
5259       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5260                                  " instances")
5261
5262     if len(instance.secondary_nodes) != 1:
5263       raise errors.OpPrereqError("The instance has a strange layout,"
5264                                  " expected one secondary but found %d" %
5265                                  len(instance.secondary_nodes))
5266
5267     self.sec_node = instance.secondary_nodes[0]
5268
5269     if self.op.iallocator is not None:
5270       self._RunAllocator()
5271
5272     remote_node = self.op.remote_node
5273     if remote_node is not None:
5274       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5275       assert self.remote_node_info is not None, \
5276         "Cannot retrieve locked node %s" % remote_node
5277     else:
5278       self.remote_node_info = None
5279     if remote_node == instance.primary_node:
5280       raise errors.OpPrereqError("The specified node is the primary node of"
5281                                  " the instance.")
5282     elif remote_node == self.sec_node:
5283       raise errors.OpPrereqError("The specified node is already the"
5284                                  " secondary node of the instance.")
5285
5286     if self.op.mode == constants.REPLACE_DISK_PRI:
5287       n1 = self.tgt_node = instance.primary_node
5288       n2 = self.oth_node = self.sec_node
5289     elif self.op.mode == constants.REPLACE_DISK_SEC:
5290       n1 = self.tgt_node = self.sec_node
5291       n2 = self.oth_node = instance.primary_node
5292     elif self.op.mode == constants.REPLACE_DISK_CHG:
5293       n1 = self.new_node = remote_node
5294       n2 = self.oth_node = instance.primary_node
5295       self.tgt_node = self.sec_node
5296       _CheckNodeNotDrained(self, remote_node)
5297     else:
5298       raise errors.ProgrammerError("Unhandled disk replace mode")
5299
5300     _CheckNodeOnline(self, n1)
5301     _CheckNodeOnline(self, n2)
5302
5303     if not self.op.disks:
5304       self.op.disks = range(len(instance.disks))
5305
5306     for disk_idx in self.op.disks:
5307       instance.FindDisk(disk_idx)
5308
5309   def _ExecD8DiskOnly(self, feedback_fn):
5310     """Replace a disk on the primary or secondary for dbrd8.
5311
5312     The algorithm for replace is quite complicated:
5313
5314       1. for each disk to be replaced:
5315
5316         1. create new LVs on the target node with unique names
5317         1. detach old LVs from the drbd device
5318         1. rename old LVs to name_replaced.<time_t>
5319         1. rename new LVs to old LVs
5320         1. attach the new LVs (with the old names now) to the drbd device
5321
5322       1. wait for sync across all devices
5323
5324       1. for each modified disk:
5325
5326         1. remove old LVs (which have the name name_replaces.<time_t>)
5327
5328     Failures are not very well handled.
5329
5330     """
5331     steps_total = 6
5332     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5333     instance = self.instance
5334     iv_names = {}
5335     vgname = self.cfg.GetVGName()
5336     # start of work
5337     cfg = self.cfg
5338     tgt_node = self.tgt_node
5339     oth_node = self.oth_node
5340
5341     # Step: check device activation
5342     self.proc.LogStep(1, steps_total, "check device existence")
5343     info("checking volume groups")
5344     my_vg = cfg.GetVGName()
5345     results = self.rpc.call_vg_list([oth_node, tgt_node])
5346     if not results:
5347       raise errors.OpExecError("Can't list volume groups on the nodes")
5348     for node in oth_node, tgt_node:
5349       res = results[node]
5350       if res.failed or not res.data or my_vg not in res.data:
5351         raise errors.OpExecError("Volume group '%s' not found on %s" %
5352                                  (my_vg, node))
5353     for idx, dev in enumerate(instance.disks):
5354       if idx not in self.op.disks:
5355         continue
5356       for node in tgt_node, oth_node:
5357         info("checking disk/%d on %s" % (idx, node))
5358         cfg.SetDiskID(dev, node)
5359         result = self.rpc.call_blockdev_find(node, dev)
5360         msg = result.RemoteFailMsg()
5361         if not msg and not result.payload:
5362           msg = "disk not found"
5363         if msg:
5364           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5365                                    (idx, node, msg))
5366
5367     # Step: check other node consistency
5368     self.proc.LogStep(2, steps_total, "check peer consistency")
5369     for idx, dev in enumerate(instance.disks):
5370       if idx not in self.op.disks:
5371         continue
5372       info("checking disk/%d consistency on %s" % (idx, oth_node))
5373       if not _CheckDiskConsistency(self, dev, oth_node,
5374                                    oth_node==instance.primary_node):
5375         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5376                                  " to replace disks on this node (%s)" %
5377                                  (oth_node, tgt_node))
5378
5379     # Step: create new storage
5380     self.proc.LogStep(3, steps_total, "allocate new storage")
5381     for idx, dev in enumerate(instance.disks):
5382       if idx not in self.op.disks:
5383         continue
5384       size = dev.size
5385       cfg.SetDiskID(dev, tgt_node)
5386       lv_names = [".disk%d_%s" % (idx, suf)
5387                   for suf in ["data", "meta"]]
5388       names = _GenerateUniqueNames(self, lv_names)
5389       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5390                              logical_id=(vgname, names[0]))
5391       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5392                              logical_id=(vgname, names[1]))
5393       new_lvs = [lv_data, lv_meta]
5394       old_lvs = dev.children
5395       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5396       info("creating new local storage on %s for %s" %
5397            (tgt_node, dev.iv_name))
5398       # we pass force_create=True to force the LVM creation
5399       for new_lv in new_lvs:
5400         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5401                         _GetInstanceInfoText(instance), False)
5402
5403     # Step: for each lv, detach+rename*2+attach
5404     self.proc.LogStep(4, steps_total, "change drbd configuration")
5405     for dev, old_lvs, new_lvs in iv_names.itervalues():
5406       info("detaching %s drbd from local storage" % dev.iv_name)
5407       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5408       result.Raise()
5409       if not result.data:
5410         raise errors.OpExecError("Can't detach drbd from local storage on node"
5411                                  " %s for device %s" % (tgt_node, dev.iv_name))
5412       #dev.children = []
5413       #cfg.Update(instance)
5414
5415       # ok, we created the new LVs, so now we know we have the needed
5416       # storage; as such, we proceed on the target node to rename
5417       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5418       # using the assumption that logical_id == physical_id (which in
5419       # turn is the unique_id on that node)
5420
5421       # FIXME(iustin): use a better name for the replaced LVs
5422       temp_suffix = int(time.time())
5423       ren_fn = lambda d, suff: (d.physical_id[0],
5424                                 d.physical_id[1] + "_replaced-%s" % suff)
5425       # build the rename list based on what LVs exist on the node
5426       rlist = []
5427       for to_ren in old_lvs:
5428         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5429         if not result.RemoteFailMsg() and result.payload:
5430           # device exists
5431           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5432
5433       info("renaming the old LVs on the target node")
5434       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5435       result.Raise()
5436       if not result.data:
5437         raise errors.OpExecError("Can't rename old LVs on node %s" % tgt_node)
5438       # now we rename the new LVs to the old LVs
5439       info("renaming the new LVs on the target node")
5440       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5441       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5442       result.Raise()
5443       if not result.data:
5444         raise errors.OpExecError("Can't rename new LVs on node %s" % tgt_node)
5445
5446       for old, new in zip(old_lvs, new_lvs):
5447         new.logical_id = old.logical_id
5448         cfg.SetDiskID(new, tgt_node)
5449
5450       for disk in old_lvs:
5451         disk.logical_id = ren_fn(disk, temp_suffix)
5452         cfg.SetDiskID(disk, tgt_node)
5453
5454       # now that the new lvs have the old name, we can add them to the device
5455       info("adding new mirror component on %s" % tgt_node)
5456       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5457       if result.failed or not result.data:
5458         for new_lv in new_lvs:
5459           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5460           if msg:
5461             warning("Can't rollback device %s: %s", dev, msg,
5462                     hint="cleanup manually the unused logical volumes")
5463         raise errors.OpExecError("Can't add local storage to drbd")
5464
5465       dev.children = new_lvs
5466       cfg.Update(instance)
5467
5468     # Step: wait for sync
5469
5470     # this can fail as the old devices are degraded and _WaitForSync
5471     # does a combined result over all disks, so we don't check its
5472     # return value
5473     self.proc.LogStep(5, steps_total, "sync devices")
5474     _WaitForSync(self, instance, unlock=True)
5475
5476     # so check manually all the devices
5477     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5478       cfg.SetDiskID(dev, instance.primary_node)
5479       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5480       msg = result.RemoteFailMsg()
5481       if not msg and not result.payload:
5482         msg = "disk not found"
5483       if msg:
5484         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5485                                  (name, msg))
5486       if result.payload[5]:
5487         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5488
5489     # Step: remove old storage
5490     self.proc.LogStep(6, steps_total, "removing old storage")
5491     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5492       info("remove logical volumes for %s" % name)
5493       for lv in old_lvs:
5494         cfg.SetDiskID(lv, tgt_node)
5495         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5496         if msg:
5497           warning("Can't remove old LV: %s" % msg,
5498                   hint="manually remove unused LVs")
5499           continue
5500
5501   def _ExecD8Secondary(self, feedback_fn):
5502     """Replace the secondary node for drbd8.
5503
5504     The algorithm for replace is quite complicated:
5505       - for all disks of the instance:
5506         - create new LVs on the new node with same names
5507         - shutdown the drbd device on the old secondary
5508         - disconnect the drbd network on the primary
5509         - create the drbd device on the new secondary
5510         - network attach the drbd on the primary, using an artifice:
5511           the drbd code for Attach() will connect to the network if it
5512           finds a device which is connected to the good local disks but
5513           not network enabled
5514       - wait for sync across all devices
5515       - remove all disks from the old secondary
5516
5517     Failures are not very well handled.
5518
5519     """
5520     steps_total = 6
5521     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5522     instance = self.instance
5523     iv_names = {}
5524     # start of work
5525     cfg = self.cfg
5526     old_node = self.tgt_node
5527     new_node = self.new_node
5528     pri_node = instance.primary_node
5529     nodes_ip = {
5530       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5531       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5532       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5533       }
5534
5535     # Step: check device activation
5536     self.proc.LogStep(1, steps_total, "check device existence")
5537     info("checking volume groups")
5538     my_vg = cfg.GetVGName()
5539     results = self.rpc.call_vg_list([pri_node, new_node])
5540     for node in pri_node, new_node:
5541       res = results[node]
5542       if res.failed or not res.data or my_vg not in res.data:
5543         raise errors.OpExecError("Volume group '%s' not found on %s" %
5544                                  (my_vg, node))
5545     for idx, dev in enumerate(instance.disks):
5546       if idx not in self.op.disks:
5547         continue
5548       info("checking disk/%d on %s" % (idx, pri_node))
5549       cfg.SetDiskID(dev, pri_node)
5550       result = self.rpc.call_blockdev_find(pri_node, dev)
5551       msg = result.RemoteFailMsg()
5552       if not msg and not result.payload:
5553         msg = "disk not found"
5554       if msg:
5555         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5556                                  (idx, pri_node, msg))
5557
5558     # Step: check other node consistency
5559     self.proc.LogStep(2, steps_total, "check peer consistency")
5560     for idx, dev in enumerate(instance.disks):
5561       if idx not in self.op.disks:
5562         continue
5563       info("checking disk/%d consistency on %s" % (idx, pri_node))
5564       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5565         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5566                                  " unsafe to replace the secondary" %
5567                                  pri_node)
5568
5569     # Step: create new storage
5570     self.proc.LogStep(3, steps_total, "allocate new storage")
5571     for idx, dev in enumerate(instance.disks):
5572       info("adding new local storage on %s for disk/%d" %
5573            (new_node, idx))
5574       # we pass force_create=True to force LVM creation
5575       for new_lv in dev.children:
5576         _CreateBlockDev(self, new_node, instance, new_lv, True,
5577                         _GetInstanceInfoText(instance), False)
5578
5579     # Step 4: dbrd minors and drbd setups changes
5580     # after this, we must manually remove the drbd minors on both the
5581     # error and the success paths
5582     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5583                                    instance.name)
5584     logging.debug("Allocated minors %s" % (minors,))
5585     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5586     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5587       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5588       # create new devices on new_node; note that we create two IDs:
5589       # one without port, so the drbd will be activated without
5590       # networking information on the new node at this stage, and one
5591       # with network, for the latter activation in step 4
5592       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5593       if pri_node == o_node1:
5594         p_minor = o_minor1
5595       else:
5596         p_minor = o_minor2
5597
5598       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5599       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5600
5601       iv_names[idx] = (dev, dev.children, new_net_id)
5602       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5603                     new_net_id)
5604       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5605                               logical_id=new_alone_id,
5606                               children=dev.children,
5607                               size=dev.size)
5608       try:
5609         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5610                               _GetInstanceInfoText(instance), False)
5611       except errors.GenericError:
5612         self.cfg.ReleaseDRBDMinors(instance.name)
5613         raise
5614
5615     for idx, dev in enumerate(instance.disks):
5616       # we have new devices, shutdown the drbd on the old secondary
5617       info("shutting down drbd for disk/%d on old node" % idx)
5618       cfg.SetDiskID(dev, old_node)
5619       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5620       if msg:
5621         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5622                 (idx, msg),
5623                 hint="Please cleanup this device manually as soon as possible")
5624
5625     info("detaching primary drbds from the network (=> standalone)")
5626     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5627                                                instance.disks)[pri_node]
5628
5629     msg = result.RemoteFailMsg()
5630     if msg:
5631       # detaches didn't succeed (unlikely)
5632       self.cfg.ReleaseDRBDMinors(instance.name)
5633       raise errors.OpExecError("Can't detach the disks from the network on"
5634                                " old node: %s" % (msg,))
5635
5636     # if we managed to detach at least one, we update all the disks of
5637     # the instance to point to the new secondary
5638     info("updating instance configuration")
5639     for dev, _, new_logical_id in iv_names.itervalues():
5640       dev.logical_id = new_logical_id
5641       cfg.SetDiskID(dev, pri_node)
5642     cfg.Update(instance)
5643
5644     # and now perform the drbd attach
5645     info("attaching primary drbds to new secondary (standalone => connected)")
5646     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5647                                            instance.disks, instance.name,
5648                                            False)
5649     for to_node, to_result in result.items():
5650       msg = to_result.RemoteFailMsg()
5651       if msg:
5652         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5653                 hint="please do a gnt-instance info to see the"
5654                 " status of disks")
5655
5656     # this can fail as the old devices are degraded and _WaitForSync
5657     # does a combined result over all disks, so we don't check its
5658     # return value
5659     self.proc.LogStep(5, steps_total, "sync devices")
5660     _WaitForSync(self, instance, unlock=True)
5661
5662     # so check manually all the devices
5663     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5664       cfg.SetDiskID(dev, pri_node)
5665       result = self.rpc.call_blockdev_find(pri_node, dev)
5666       msg = result.RemoteFailMsg()
5667       if not msg and not result.payload:
5668         msg = "disk not found"
5669       if msg:
5670         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5671                                  (idx, msg))
5672       if result.payload[5]:
5673         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5674
5675     self.proc.LogStep(6, steps_total, "removing old storage")
5676     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5677       info("remove logical volumes for disk/%d" % idx)
5678       for lv in old_lvs:
5679         cfg.SetDiskID(lv, old_node)
5680         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5681         if msg:
5682           warning("Can't remove LV on old secondary: %s", msg,
5683                   hint="Cleanup stale volumes by hand")
5684
5685   def Exec(self, feedback_fn):
5686     """Execute disk replacement.
5687
5688     This dispatches the disk replacement to the appropriate handler.
5689
5690     """
5691     instance = self.instance
5692
5693     # Activate the instance disks if we're replacing them on a down instance
5694     if not instance.admin_up:
5695       _StartInstanceDisks(self, instance, True)
5696
5697     if self.op.mode == constants.REPLACE_DISK_CHG:
5698       fn = self._ExecD8Secondary
5699     else:
5700       fn = self._ExecD8DiskOnly
5701
5702     ret = fn(feedback_fn)
5703
5704     # Deactivate the instance disks if we're replacing them on a down instance
5705     if not instance.admin_up:
5706       _SafeShutdownInstanceDisks(self, instance)
5707
5708     return ret
5709
5710
5711 class LUGrowDisk(LogicalUnit):
5712   """Grow a disk of an instance.
5713
5714   """
5715   HPATH = "disk-grow"
5716   HTYPE = constants.HTYPE_INSTANCE
5717   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5718   REQ_BGL = False
5719
5720   def ExpandNames(self):
5721     self._ExpandAndLockInstance()
5722     self.needed_locks[locking.LEVEL_NODE] = []
5723     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5724
5725   def DeclareLocks(self, level):
5726     if level == locking.LEVEL_NODE:
5727       self._LockInstancesNodes()
5728
5729   def BuildHooksEnv(self):
5730     """Build hooks env.
5731
5732     This runs on the master, the primary and all the secondaries.
5733
5734     """
5735     env = {
5736       "DISK": self.op.disk,
5737       "AMOUNT": self.op.amount,
5738       }
5739     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5740     nl = [
5741       self.cfg.GetMasterNode(),
5742       self.instance.primary_node,
5743       ]
5744     return env, nl, nl
5745
5746   def CheckPrereq(self):
5747     """Check prerequisites.
5748
5749     This checks that the instance is in the cluster.
5750
5751     """
5752     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5753     assert instance is not None, \
5754       "Cannot retrieve locked instance %s" % self.op.instance_name
5755     nodenames = list(instance.all_nodes)
5756     for node in nodenames:
5757       _CheckNodeOnline(self, node)
5758
5759
5760     self.instance = instance
5761
5762     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5763       raise errors.OpPrereqError("Instance's disk layout does not support"
5764                                  " growing.")
5765
5766     self.disk = instance.FindDisk(self.op.disk)
5767
5768     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5769                                        instance.hypervisor)
5770     for node in nodenames:
5771       info = nodeinfo[node]
5772       if info.failed or not info.data:
5773         raise errors.OpPrereqError("Cannot get current information"
5774                                    " from node '%s'" % node)
5775       vg_free = info.data.get('vg_free', None)
5776       if not isinstance(vg_free, int):
5777         raise errors.OpPrereqError("Can't compute free disk space on"
5778                                    " node %s" % node)
5779       if self.op.amount > vg_free:
5780         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5781                                    " %d MiB available, %d MiB required" %
5782                                    (node, vg_free, self.op.amount))
5783
5784   def Exec(self, feedback_fn):
5785     """Execute disk grow.
5786
5787     """
5788     instance = self.instance
5789     disk = self.disk
5790     for node in instance.all_nodes:
5791       self.cfg.SetDiskID(disk, node)
5792       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5793       msg = result.RemoteFailMsg()
5794       if msg:
5795         raise errors.OpExecError("Grow request failed to node %s: %s" %
5796                                  (node, msg))
5797     disk.RecordGrow(self.op.amount)
5798     self.cfg.Update(instance)
5799     if self.op.wait_for_sync:
5800       disk_abort = not _WaitForSync(self, instance)
5801       if disk_abort:
5802         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5803                              " status.\nPlease check the instance.")
5804
5805
5806 class LUQueryInstanceData(NoHooksLU):
5807   """Query runtime instance data.
5808
5809   """
5810   _OP_REQP = ["instances", "static"]
5811   REQ_BGL = False
5812
5813   def ExpandNames(self):
5814     self.needed_locks = {}
5815     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5816
5817     if not isinstance(self.op.instances, list):
5818       raise errors.OpPrereqError("Invalid argument type 'instances'")
5819
5820     if self.op.instances:
5821       self.wanted_names = []
5822       for name in self.op.instances:
5823         full_name = self.cfg.ExpandInstanceName(name)
5824         if full_name is None:
5825           raise errors.OpPrereqError("Instance '%s' not known" % name)
5826         self.wanted_names.append(full_name)
5827       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5828     else:
5829       self.wanted_names = None
5830       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5831
5832     self.needed_locks[locking.LEVEL_NODE] = []
5833     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5834
5835   def DeclareLocks(self, level):
5836     if level == locking.LEVEL_NODE:
5837       self._LockInstancesNodes()
5838
5839   def CheckPrereq(self):
5840     """Check prerequisites.
5841
5842     This only checks the optional instance list against the existing names.
5843
5844     """
5845     if self.wanted_names is None:
5846       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5847
5848     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5849                              in self.wanted_names]
5850     return
5851
5852   def _ComputeDiskStatus(self, instance, snode, dev):
5853     """Compute block device status.
5854
5855     """
5856     static = self.op.static
5857     if not static:
5858       self.cfg.SetDiskID(dev, instance.primary_node)
5859       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5860       if dev_pstatus.offline:
5861         dev_pstatus = None
5862       else:
5863         msg = dev_pstatus.RemoteFailMsg()
5864         if msg:
5865           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5866                                    (instance.name, msg))
5867         dev_pstatus = dev_pstatus.payload
5868     else:
5869       dev_pstatus = None
5870
5871     if dev.dev_type in constants.LDS_DRBD:
5872       # we change the snode then (otherwise we use the one passed in)
5873       if dev.logical_id[0] == instance.primary_node:
5874         snode = dev.logical_id[1]
5875       else:
5876         snode = dev.logical_id[0]
5877
5878     if snode and not static:
5879       self.cfg.SetDiskID(dev, snode)
5880       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5881       if dev_sstatus.offline:
5882         dev_sstatus = None
5883       else:
5884         msg = dev_sstatus.RemoteFailMsg()
5885         if msg:
5886           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5887                                    (instance.name, msg))
5888         dev_sstatus = dev_sstatus.payload
5889     else:
5890       dev_sstatus = None
5891
5892     if dev.children:
5893       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5894                       for child in dev.children]
5895     else:
5896       dev_children = []
5897
5898     data = {
5899       "iv_name": dev.iv_name,
5900       "dev_type": dev.dev_type,
5901       "logical_id": dev.logical_id,
5902       "physical_id": dev.physical_id,
5903       "pstatus": dev_pstatus,
5904       "sstatus": dev_sstatus,
5905       "children": dev_children,
5906       "mode": dev.mode,
5907       "size": dev.size,
5908       }
5909
5910     return data
5911
5912   def Exec(self, feedback_fn):
5913     """Gather and return data"""
5914     result = {}
5915
5916     cluster = self.cfg.GetClusterInfo()
5917
5918     for instance in self.wanted_instances:
5919       if not self.op.static:
5920         remote_info = self.rpc.call_instance_info(instance.primary_node,
5921                                                   instance.name,
5922                                                   instance.hypervisor)
5923         remote_info.Raise()
5924         remote_info = remote_info.data
5925         if remote_info and "state" in remote_info:
5926           remote_state = "up"
5927         else:
5928           remote_state = "down"
5929       else:
5930         remote_state = None
5931       if instance.admin_up:
5932         config_state = "up"
5933       else:
5934         config_state = "down"
5935
5936       disks = [self._ComputeDiskStatus(instance, None, device)
5937                for device in instance.disks]
5938
5939       idict = {
5940         "name": instance.name,
5941         "config_state": config_state,
5942         "run_state": remote_state,
5943         "pnode": instance.primary_node,
5944         "snodes": instance.secondary_nodes,
5945         "os": instance.os,
5946         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5947         "disks": disks,
5948         "hypervisor": instance.hypervisor,
5949         "network_port": instance.network_port,
5950         "hv_instance": instance.hvparams,
5951         "hv_actual": cluster.FillHV(instance),
5952         "be_instance": instance.beparams,
5953         "be_actual": cluster.FillBE(instance),
5954         }
5955
5956       result[instance.name] = idict
5957
5958     return result
5959
5960
5961 class LUSetInstanceParams(LogicalUnit):
5962   """Modifies an instances's parameters.
5963
5964   """
5965   HPATH = "instance-modify"
5966   HTYPE = constants.HTYPE_INSTANCE
5967   _OP_REQP = ["instance_name"]
5968   REQ_BGL = False
5969
5970   def CheckArguments(self):
5971     if not hasattr(self.op, 'nics'):
5972       self.op.nics = []
5973     if not hasattr(self.op, 'disks'):
5974       self.op.disks = []
5975     if not hasattr(self.op, 'beparams'):
5976       self.op.beparams = {}
5977     if not hasattr(self.op, 'hvparams'):
5978       self.op.hvparams = {}
5979     self.op.force = getattr(self.op, "force", False)
5980     if not (self.op.nics or self.op.disks or
5981             self.op.hvparams or self.op.beparams):
5982       raise errors.OpPrereqError("No changes submitted")
5983
5984     # Disk validation
5985     disk_addremove = 0
5986     for disk_op, disk_dict in self.op.disks:
5987       if disk_op == constants.DDM_REMOVE:
5988         disk_addremove += 1
5989         continue
5990       elif disk_op == constants.DDM_ADD:
5991         disk_addremove += 1
5992       else:
5993         if not isinstance(disk_op, int):
5994           raise errors.OpPrereqError("Invalid disk index")
5995       if disk_op == constants.DDM_ADD:
5996         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5997         if mode not in constants.DISK_ACCESS_SET:
5998           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5999         size = disk_dict.get('size', None)
6000         if size is None:
6001           raise errors.OpPrereqError("Required disk parameter size missing")
6002         try:
6003           size = int(size)
6004         except ValueError, err:
6005           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
6006                                      str(err))
6007         disk_dict['size'] = size
6008       else:
6009         # modification of disk
6010         if 'size' in disk_dict:
6011           raise errors.OpPrereqError("Disk size change not possible, use"
6012                                      " grow-disk")
6013
6014     if disk_addremove > 1:
6015       raise errors.OpPrereqError("Only one disk add or remove operation"
6016                                  " supported at a time")
6017
6018     # NIC validation
6019     nic_addremove = 0
6020     for nic_op, nic_dict in self.op.nics:
6021       if nic_op == constants.DDM_REMOVE:
6022         nic_addremove += 1
6023         continue
6024       elif nic_op == constants.DDM_ADD:
6025         nic_addremove += 1
6026       else:
6027         if not isinstance(nic_op, int):
6028           raise errors.OpPrereqError("Invalid nic index")
6029
6030       # nic_dict should be a dict
6031       nic_ip = nic_dict.get('ip', None)
6032       if nic_ip is not None:
6033         if nic_ip.lower() == constants.VALUE_NONE:
6034           nic_dict['ip'] = None
6035         else:
6036           if not utils.IsValidIP(nic_ip):
6037             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
6038
6039       if nic_op == constants.DDM_ADD:
6040         nic_bridge = nic_dict.get('bridge', None)
6041         if nic_bridge is None:
6042           nic_dict['bridge'] = self.cfg.GetDefBridge()
6043         nic_mac = nic_dict.get('mac', None)
6044         if nic_mac is None:
6045           nic_dict['mac'] = constants.VALUE_AUTO
6046
6047       if 'mac' in nic_dict:
6048         nic_mac = nic_dict['mac']
6049         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6050           if not utils.IsValidMac(nic_mac):
6051             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
6052         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
6053           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
6054                                      " modifying an existing nic")
6055
6056     if nic_addremove > 1:
6057       raise errors.OpPrereqError("Only one NIC add or remove operation"
6058                                  " supported at a time")
6059
6060   def ExpandNames(self):
6061     self._ExpandAndLockInstance()
6062     self.needed_locks[locking.LEVEL_NODE] = []
6063     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
6064
6065   def DeclareLocks(self, level):
6066     if level == locking.LEVEL_NODE:
6067       self._LockInstancesNodes()
6068
6069   def BuildHooksEnv(self):
6070     """Build hooks env.
6071
6072     This runs on the master, primary and secondaries.
6073
6074     """
6075     args = dict()
6076     if constants.BE_MEMORY in self.be_new:
6077       args['memory'] = self.be_new[constants.BE_MEMORY]
6078     if constants.BE_VCPUS in self.be_new:
6079       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6080     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6081     # information at all.
6082     if self.op.nics:
6083       args['nics'] = []
6084       nic_override = dict(self.op.nics)
6085       for idx, nic in enumerate(self.instance.nics):
6086         if idx in nic_override:
6087           this_nic_override = nic_override[idx]
6088         else:
6089           this_nic_override = {}
6090         if 'ip' in this_nic_override:
6091           ip = this_nic_override['ip']
6092         else:
6093           ip = nic.ip
6094         if 'bridge' in this_nic_override:
6095           bridge = this_nic_override['bridge']
6096         else:
6097           bridge = nic.bridge
6098         if 'mac' in this_nic_override:
6099           mac = this_nic_override['mac']
6100         else:
6101           mac = nic.mac
6102         args['nics'].append((ip, bridge, mac))
6103       if constants.DDM_ADD in nic_override:
6104         ip = nic_override[constants.DDM_ADD].get('ip', None)
6105         bridge = nic_override[constants.DDM_ADD]['bridge']
6106         mac = nic_override[constants.DDM_ADD]['mac']
6107         args['nics'].append((ip, bridge, mac))
6108       elif constants.DDM_REMOVE in nic_override:
6109         del args['nics'][-1]
6110
6111     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6112     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6113     return env, nl, nl
6114
6115   def CheckPrereq(self):
6116     """Check prerequisites.
6117
6118     This only checks the instance list against the existing names.
6119
6120     """
6121     self.force = self.op.force
6122
6123     # checking the new params on the primary/secondary nodes
6124
6125     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6126     assert self.instance is not None, \
6127       "Cannot retrieve locked instance %s" % self.op.instance_name
6128     pnode = instance.primary_node
6129     nodelist = list(instance.all_nodes)
6130
6131     # hvparams processing
6132     if self.op.hvparams:
6133       i_hvdict = copy.deepcopy(instance.hvparams)
6134       for key, val in self.op.hvparams.iteritems():
6135         if val == constants.VALUE_DEFAULT:
6136           try:
6137             del i_hvdict[key]
6138           except KeyError:
6139             pass
6140         else:
6141           i_hvdict[key] = val
6142       cluster = self.cfg.GetClusterInfo()
6143       utils.ForceDictType(i_hvdict, constants.HVS_PARAMETER_TYPES)
6144       hv_new = cluster.FillDict(cluster.hvparams[instance.hypervisor],
6145                                 i_hvdict)
6146       # local check
6147       hypervisor.GetHypervisor(
6148         instance.hypervisor).CheckParameterSyntax(hv_new)
6149       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6150       self.hv_new = hv_new # the new actual values
6151       self.hv_inst = i_hvdict # the new dict (without defaults)
6152     else:
6153       self.hv_new = self.hv_inst = {}
6154
6155     # beparams processing
6156     if self.op.beparams:
6157       i_bedict = copy.deepcopy(instance.beparams)
6158       for key, val in self.op.beparams.iteritems():
6159         if val == constants.VALUE_DEFAULT:
6160           try:
6161             del i_bedict[key]
6162           except KeyError:
6163             pass
6164         else:
6165           i_bedict[key] = val
6166       cluster = self.cfg.GetClusterInfo()
6167       utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES)
6168       be_new = cluster.FillDict(cluster.beparams[constants.BEGR_DEFAULT],
6169                                 i_bedict)
6170       self.be_new = be_new # the new actual values
6171       self.be_inst = i_bedict # the new dict (without defaults)
6172     else:
6173       self.be_new = self.be_inst = {}
6174
6175     self.warn = []
6176
6177     if constants.BE_MEMORY in self.op.beparams and not self.force:
6178       mem_check_list = [pnode]
6179       if be_new[constants.BE_AUTO_BALANCE]:
6180         # either we changed auto_balance to yes or it was from before
6181         mem_check_list.extend(instance.secondary_nodes)
6182       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6183                                                   instance.hypervisor)
6184       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6185                                          instance.hypervisor)
6186       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6187         # Assume the primary node is unreachable and go ahead
6188         self.warn.append("Can't get info from primary node %s" % pnode)
6189       else:
6190         if not instance_info.failed and instance_info.data:
6191           current_mem = int(instance_info.data['memory'])
6192         else:
6193           # Assume instance not running
6194           # (there is a slight race condition here, but it's not very probable,
6195           # and we have no other way to check)
6196           current_mem = 0
6197         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6198                     nodeinfo[pnode].data['memory_free'])
6199         if miss_mem > 0:
6200           raise errors.OpPrereqError("This change will prevent the instance"
6201                                      " from starting, due to %d MB of memory"
6202                                      " missing on its primary node" % miss_mem)
6203
6204       if be_new[constants.BE_AUTO_BALANCE]:
6205         for node, nres in nodeinfo.iteritems():
6206           if node not in instance.secondary_nodes:
6207             continue
6208           if nres.failed or not isinstance(nres.data, dict):
6209             self.warn.append("Can't get info from secondary node %s" % node)
6210           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6211             self.warn.append("Not enough memory to failover instance to"
6212                              " secondary node %s" % node)
6213
6214     # NIC processing
6215     for nic_op, nic_dict in self.op.nics:
6216       if nic_op == constants.DDM_REMOVE:
6217         if not instance.nics:
6218           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6219         continue
6220       if nic_op != constants.DDM_ADD:
6221         # an existing nic
6222         if nic_op < 0 or nic_op >= len(instance.nics):
6223           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6224                                      " are 0 to %d" %
6225                                      (nic_op, len(instance.nics)))
6226       if 'bridge' in nic_dict:
6227         nic_bridge = nic_dict['bridge']
6228         if nic_bridge is None:
6229           raise errors.OpPrereqError('Cannot set the nic bridge to None')
6230         if not self.rpc.call_bridges_exist(pnode, [nic_bridge]):
6231           msg = ("Bridge '%s' doesn't exist on one of"
6232                  " the instance nodes" % nic_bridge)
6233           if self.force:
6234             self.warn.append(msg)
6235           else:
6236             raise errors.OpPrereqError(msg)
6237       if 'mac' in nic_dict:
6238         nic_mac = nic_dict['mac']
6239         if nic_mac is None:
6240           raise errors.OpPrereqError('Cannot set the nic mac to None')
6241         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6242           # otherwise generate the mac
6243           nic_dict['mac'] = self.cfg.GenerateMAC()
6244         else:
6245           # or validate/reserve the current one
6246           if self.cfg.IsMacInUse(nic_mac):
6247             raise errors.OpPrereqError("MAC address %s already in use"
6248                                        " in cluster" % nic_mac)
6249
6250     # DISK processing
6251     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6252       raise errors.OpPrereqError("Disk operations not supported for"
6253                                  " diskless instances")
6254     for disk_op, disk_dict in self.op.disks:
6255       if disk_op == constants.DDM_REMOVE:
6256         if len(instance.disks) == 1:
6257           raise errors.OpPrereqError("Cannot remove the last disk of"
6258                                      " an instance")
6259         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6260         ins_l = ins_l[pnode]
6261         if ins_l.failed or not isinstance(ins_l.data, list):
6262           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6263         if instance.name in ins_l.data:
6264           raise errors.OpPrereqError("Instance is running, can't remove"
6265                                      " disks.")
6266
6267       if (disk_op == constants.DDM_ADD and
6268           len(instance.nics) >= constants.MAX_DISKS):
6269         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6270                                    " add more" % constants.MAX_DISKS)
6271       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6272         # an existing disk
6273         if disk_op < 0 or disk_op >= len(instance.disks):
6274           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6275                                      " are 0 to %d" %
6276                                      (disk_op, len(instance.disks)))
6277
6278     return
6279
6280   def Exec(self, feedback_fn):
6281     """Modifies an instance.
6282
6283     All parameters take effect only at the next restart of the instance.
6284
6285     """
6286     # Process here the warnings from CheckPrereq, as we don't have a
6287     # feedback_fn there.
6288     for warn in self.warn:
6289       feedback_fn("WARNING: %s" % warn)
6290
6291     result = []
6292     instance = self.instance
6293     # disk changes
6294     for disk_op, disk_dict in self.op.disks:
6295       if disk_op == constants.DDM_REMOVE:
6296         # remove the last disk
6297         device = instance.disks.pop()
6298         device_idx = len(instance.disks)
6299         for node, disk in device.ComputeNodeTree(instance.primary_node):
6300           self.cfg.SetDiskID(disk, node)
6301           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6302           if msg:
6303             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6304                             " continuing anyway", device_idx, node, msg)
6305         result.append(("disk/%d" % device_idx, "remove"))
6306       elif disk_op == constants.DDM_ADD:
6307         # add a new disk
6308         if instance.disk_template == constants.DT_FILE:
6309           file_driver, file_path = instance.disks[0].logical_id
6310           file_path = os.path.dirname(file_path)
6311         else:
6312           file_driver = file_path = None
6313         disk_idx_base = len(instance.disks)
6314         new_disk = _GenerateDiskTemplate(self,
6315                                          instance.disk_template,
6316                                          instance.name, instance.primary_node,
6317                                          instance.secondary_nodes,
6318                                          [disk_dict],
6319                                          file_path,
6320                                          file_driver,
6321                                          disk_idx_base)[0]
6322         instance.disks.append(new_disk)
6323         info = _GetInstanceInfoText(instance)
6324
6325         logging.info("Creating volume %s for instance %s",
6326                      new_disk.iv_name, instance.name)
6327         # Note: this needs to be kept in sync with _CreateDisks
6328         #HARDCODE
6329         for node in instance.all_nodes:
6330           f_create = node == instance.primary_node
6331           try:
6332             _CreateBlockDev(self, node, instance, new_disk,
6333                             f_create, info, f_create)
6334           except errors.OpExecError, err:
6335             self.LogWarning("Failed to create volume %s (%s) on"
6336                             " node %s: %s",
6337                             new_disk.iv_name, new_disk, node, err)
6338         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6339                        (new_disk.size, new_disk.mode)))
6340       else:
6341         # change a given disk
6342         instance.disks[disk_op].mode = disk_dict['mode']
6343         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6344     # NIC changes
6345     for nic_op, nic_dict in self.op.nics:
6346       if nic_op == constants.DDM_REMOVE:
6347         # remove the last nic
6348         del instance.nics[-1]
6349         result.append(("nic.%d" % len(instance.nics), "remove"))
6350       elif nic_op == constants.DDM_ADD:
6351         # mac and bridge should be set, by now
6352         mac = nic_dict['mac']
6353         bridge = nic_dict['bridge']
6354         new_nic = objects.NIC(mac=mac, ip=nic_dict.get('ip', None),
6355                               bridge=bridge)
6356         instance.nics.append(new_nic)
6357         result.append(("nic.%d" % (len(instance.nics) - 1),
6358                        "add:mac=%s,ip=%s,bridge=%s" %
6359                        (new_nic.mac, new_nic.ip, new_nic.bridge)))
6360       else:
6361         # change a given nic
6362         for key in 'mac', 'ip', 'bridge':
6363           if key in nic_dict:
6364             setattr(instance.nics[nic_op], key, nic_dict[key])
6365             result.append(("nic.%s/%d" % (key, nic_op), nic_dict[key]))
6366
6367     # hvparams changes
6368     if self.op.hvparams:
6369       instance.hvparams = self.hv_inst
6370       for key, val in self.op.hvparams.iteritems():
6371         result.append(("hv/%s" % key, val))
6372
6373     # beparams changes
6374     if self.op.beparams:
6375       instance.beparams = self.be_inst
6376       for key, val in self.op.beparams.iteritems():
6377         result.append(("be/%s" % key, val))
6378
6379     self.cfg.Update(instance)
6380
6381     return result
6382
6383
6384 class LUQueryExports(NoHooksLU):
6385   """Query the exports list
6386
6387   """
6388   _OP_REQP = ['nodes']
6389   REQ_BGL = False
6390
6391   def ExpandNames(self):
6392     self.needed_locks = {}
6393     self.share_locks[locking.LEVEL_NODE] = 1
6394     if not self.op.nodes:
6395       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6396     else:
6397       self.needed_locks[locking.LEVEL_NODE] = \
6398         _GetWantedNodes(self, self.op.nodes)
6399
6400   def CheckPrereq(self):
6401     """Check prerequisites.
6402
6403     """
6404     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6405
6406   def Exec(self, feedback_fn):
6407     """Compute the list of all the exported system images.
6408
6409     @rtype: dict
6410     @return: a dictionary with the structure node->(export-list)
6411         where export-list is a list of the instances exported on
6412         that node.
6413
6414     """
6415     rpcresult = self.rpc.call_export_list(self.nodes)
6416     result = {}
6417     for node in rpcresult:
6418       if rpcresult[node].failed:
6419         result[node] = False
6420       else:
6421         result[node] = rpcresult[node].data
6422
6423     return result
6424
6425
6426 class LUExportInstance(LogicalUnit):
6427   """Export an instance to an image in the cluster.
6428
6429   """
6430   HPATH = "instance-export"
6431   HTYPE = constants.HTYPE_INSTANCE
6432   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6433   REQ_BGL = False
6434
6435   def ExpandNames(self):
6436     self._ExpandAndLockInstance()
6437     # FIXME: lock only instance primary and destination node
6438     #
6439     # Sad but true, for now we have do lock all nodes, as we don't know where
6440     # the previous export might be, and and in this LU we search for it and
6441     # remove it from its current node. In the future we could fix this by:
6442     #  - making a tasklet to search (share-lock all), then create the new one,
6443     #    then one to remove, after
6444     #  - removing the removal operation altogether
6445     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6446
6447   def DeclareLocks(self, level):
6448     """Last minute lock declaration."""
6449     # All nodes are locked anyway, so nothing to do here.
6450
6451   def BuildHooksEnv(self):
6452     """Build hooks env.
6453
6454     This will run on the master, primary node and target node.
6455
6456     """
6457     env = {
6458       "EXPORT_NODE": self.op.target_node,
6459       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6460       }
6461     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6462     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6463           self.op.target_node]
6464     return env, nl, nl
6465
6466   def CheckPrereq(self):
6467     """Check prerequisites.
6468
6469     This checks that the instance and node names are valid.
6470
6471     """
6472     instance_name = self.op.instance_name
6473     self.instance = self.cfg.GetInstanceInfo(instance_name)
6474     assert self.instance is not None, \
6475           "Cannot retrieve locked instance %s" % self.op.instance_name
6476     _CheckNodeOnline(self, self.instance.primary_node)
6477
6478     self.dst_node = self.cfg.GetNodeInfo(
6479       self.cfg.ExpandNodeName(self.op.target_node))
6480
6481     if self.dst_node is None:
6482       # This is wrong node name, not a non-locked node
6483       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6484     _CheckNodeOnline(self, self.dst_node.name)
6485     _CheckNodeNotDrained(self, self.dst_node.name)
6486
6487     # instance disk type verification
6488     for disk in self.instance.disks:
6489       if disk.dev_type == constants.LD_FILE:
6490         raise errors.OpPrereqError("Export not supported for instances with"
6491                                    " file-based disks")
6492
6493   def Exec(self, feedback_fn):
6494     """Export an instance to an image in the cluster.
6495
6496     """
6497     instance = self.instance
6498     dst_node = self.dst_node
6499     src_node = instance.primary_node
6500     if self.op.shutdown:
6501       # shutdown the instance, but not the disks
6502       result = self.rpc.call_instance_shutdown(src_node, instance)
6503       msg = result.RemoteFailMsg()
6504       if msg:
6505         raise errors.OpExecError("Could not shutdown instance %s on"
6506                                  " node %s: %s" %
6507                                  (instance.name, src_node, msg))
6508
6509     vgname = self.cfg.GetVGName()
6510
6511     snap_disks = []
6512
6513     # set the disks ID correctly since call_instance_start needs the
6514     # correct drbd minor to create the symlinks
6515     for disk in instance.disks:
6516       self.cfg.SetDiskID(disk, src_node)
6517
6518     # per-disk results
6519     dresults = []
6520     try:
6521       for idx, disk in enumerate(instance.disks):
6522         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
6523         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
6524         if new_dev_name.failed or not new_dev_name.data:
6525           self.LogWarning("Could not snapshot disk/%d on node %s",
6526                           idx, src_node)
6527           snap_disks.append(False)
6528         else:
6529           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6530                                  logical_id=(vgname, new_dev_name.data),
6531                                  physical_id=(vgname, new_dev_name.data),
6532                                  iv_name=disk.iv_name)
6533           snap_disks.append(new_dev)
6534
6535     finally:
6536       if self.op.shutdown and instance.admin_up:
6537         result = self.rpc.call_instance_start(src_node, instance, None, None)
6538         msg = result.RemoteFailMsg()
6539         if msg:
6540           _ShutdownInstanceDisks(self, instance)
6541           raise errors.OpExecError("Could not start instance: %s" % msg)
6542
6543     # TODO: check for size
6544
6545     cluster_name = self.cfg.GetClusterName()
6546     for idx, dev in enumerate(snap_disks):
6547       if dev:
6548         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6549                                                instance, cluster_name, idx)
6550         if result.failed or not result.data:
6551           self.LogWarning("Could not export disk/%d from node %s to"
6552                           " node %s", idx, src_node, dst_node.name)
6553           dresults.append(False)
6554         else:
6555           dresults.append(True)
6556         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6557         if msg:
6558           self.LogWarning("Could not remove snapshot for disk/%d from node"
6559                           " %s: %s", idx, src_node, msg)
6560       else:
6561         dresults.append(False)
6562
6563     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6564     fin_resu = True
6565     if result.failed or not result.data:
6566       self.LogWarning("Could not finalize export for instance %s on node %s",
6567                       instance.name, dst_node.name)
6568       fin_resu = False
6569
6570     nodelist = self.cfg.GetNodeList()
6571     nodelist.remove(dst_node.name)
6572
6573     # on one-node clusters nodelist will be empty after the removal
6574     # if we proceed the backup would be removed because OpQueryExports
6575     # substitutes an empty list with the full cluster node list.
6576     if nodelist:
6577       exportlist = self.rpc.call_export_list(nodelist)
6578       for node in exportlist:
6579         if exportlist[node].failed:
6580           continue
6581         if instance.name in exportlist[node].data:
6582           if not self.rpc.call_export_remove(node, instance.name):
6583             self.LogWarning("Could not remove older export for instance %s"
6584                             " on node %s", instance.name, node)
6585     return fin_resu, dresults
6586
6587
6588 class LURemoveExport(NoHooksLU):
6589   """Remove exports related to the named instance.
6590
6591   """
6592   _OP_REQP = ["instance_name"]
6593   REQ_BGL = False
6594
6595   def ExpandNames(self):
6596     self.needed_locks = {}
6597     # We need all nodes to be locked in order for RemoveExport to work, but we
6598     # don't need to lock the instance itself, as nothing will happen to it (and
6599     # we can remove exports also for a removed instance)
6600     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6601
6602   def CheckPrereq(self):
6603     """Check prerequisites.
6604     """
6605     pass
6606
6607   def Exec(self, feedback_fn):
6608     """Remove any export.
6609
6610     """
6611     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6612     # If the instance was not found we'll try with the name that was passed in.
6613     # This will only work if it was an FQDN, though.
6614     fqdn_warn = False
6615     if not instance_name:
6616       fqdn_warn = True
6617       instance_name = self.op.instance_name
6618
6619     exportlist = self.rpc.call_export_list(self.acquired_locks[
6620       locking.LEVEL_NODE])
6621     found = False
6622     for node in exportlist:
6623       if exportlist[node].failed:
6624         self.LogWarning("Failed to query node %s, continuing" % node)
6625         continue
6626       if instance_name in exportlist[node].data:
6627         found = True
6628         result = self.rpc.call_export_remove(node, instance_name)
6629         if result.failed or not result.data:
6630           logging.error("Could not remove export for instance %s"
6631                         " on node %s", instance_name, node)
6632
6633     if fqdn_warn and not found:
6634       feedback_fn("Export not found. If trying to remove an export belonging"
6635                   " to a deleted instance please use its Fully Qualified"
6636                   " Domain Name.")
6637
6638
6639 class TagsLU(NoHooksLU):
6640   """Generic tags LU.
6641
6642   This is an abstract class which is the parent of all the other tags LUs.
6643
6644   """
6645
6646   def ExpandNames(self):
6647     self.needed_locks = {}
6648     if self.op.kind == constants.TAG_NODE:
6649       name = self.cfg.ExpandNodeName(self.op.name)
6650       if name is None:
6651         raise errors.OpPrereqError("Invalid node name (%s)" %
6652                                    (self.op.name,))
6653       self.op.name = name
6654       self.needed_locks[locking.LEVEL_NODE] = name
6655     elif self.op.kind == constants.TAG_INSTANCE:
6656       name = self.cfg.ExpandInstanceName(self.op.name)
6657       if name is None:
6658         raise errors.OpPrereqError("Invalid instance name (%s)" %
6659                                    (self.op.name,))
6660       self.op.name = name
6661       self.needed_locks[locking.LEVEL_INSTANCE] = name
6662
6663   def CheckPrereq(self):
6664     """Check prerequisites.
6665
6666     """
6667     if self.op.kind == constants.TAG_CLUSTER:
6668       self.target = self.cfg.GetClusterInfo()
6669     elif self.op.kind == constants.TAG_NODE:
6670       self.target = self.cfg.GetNodeInfo(self.op.name)
6671     elif self.op.kind == constants.TAG_INSTANCE:
6672       self.target = self.cfg.GetInstanceInfo(self.op.name)
6673     else:
6674       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6675                                  str(self.op.kind))
6676
6677
6678 class LUGetTags(TagsLU):
6679   """Returns the tags of a given object.
6680
6681   """
6682   _OP_REQP = ["kind", "name"]
6683   REQ_BGL = False
6684
6685   def Exec(self, feedback_fn):
6686     """Returns the tag list.
6687
6688     """
6689     return list(self.target.GetTags())
6690
6691
6692 class LUSearchTags(NoHooksLU):
6693   """Searches the tags for a given pattern.
6694
6695   """
6696   _OP_REQP = ["pattern"]
6697   REQ_BGL = False
6698
6699   def ExpandNames(self):
6700     self.needed_locks = {}
6701
6702   def CheckPrereq(self):
6703     """Check prerequisites.
6704
6705     This checks the pattern passed for validity by compiling it.
6706
6707     """
6708     try:
6709       self.re = re.compile(self.op.pattern)
6710     except re.error, err:
6711       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6712                                  (self.op.pattern, err))
6713
6714   def Exec(self, feedback_fn):
6715     """Returns the tag list.
6716
6717     """
6718     cfg = self.cfg
6719     tgts = [("/cluster", cfg.GetClusterInfo())]
6720     ilist = cfg.GetAllInstancesInfo().values()
6721     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6722     nlist = cfg.GetAllNodesInfo().values()
6723     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6724     results = []
6725     for path, target in tgts:
6726       for tag in target.GetTags():
6727         if self.re.search(tag):
6728           results.append((path, tag))
6729     return results
6730
6731
6732 class LUAddTags(TagsLU):
6733   """Sets a tag on a given object.
6734
6735   """
6736   _OP_REQP = ["kind", "name", "tags"]
6737   REQ_BGL = False
6738
6739   def CheckPrereq(self):
6740     """Check prerequisites.
6741
6742     This checks the type and length of the tag name and value.
6743
6744     """
6745     TagsLU.CheckPrereq(self)
6746     for tag in self.op.tags:
6747       objects.TaggableObject.ValidateTag(tag)
6748
6749   def Exec(self, feedback_fn):
6750     """Sets the tag.
6751
6752     """
6753     try:
6754       for tag in self.op.tags:
6755         self.target.AddTag(tag)
6756     except errors.TagError, err:
6757       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6758     try:
6759       self.cfg.Update(self.target)
6760     except errors.ConfigurationError:
6761       raise errors.OpRetryError("There has been a modification to the"
6762                                 " config file and the operation has been"
6763                                 " aborted. Please retry.")
6764
6765
6766 class LUDelTags(TagsLU):
6767   """Delete a list of tags from a given object.
6768
6769   """
6770   _OP_REQP = ["kind", "name", "tags"]
6771   REQ_BGL = False
6772
6773   def CheckPrereq(self):
6774     """Check prerequisites.
6775
6776     This checks that we have the given tag.
6777
6778     """
6779     TagsLU.CheckPrereq(self)
6780     for tag in self.op.tags:
6781       objects.TaggableObject.ValidateTag(tag)
6782     del_tags = frozenset(self.op.tags)
6783     cur_tags = self.target.GetTags()
6784     if not del_tags <= cur_tags:
6785       diff_tags = del_tags - cur_tags
6786       diff_names = ["'%s'" % tag for tag in diff_tags]
6787       diff_names.sort()
6788       raise errors.OpPrereqError("Tag(s) %s not found" %
6789                                  (",".join(diff_names)))
6790
6791   def Exec(self, feedback_fn):
6792     """Remove the tag from the object.
6793
6794     """
6795     for tag in self.op.tags:
6796       self.target.RemoveTag(tag)
6797     try:
6798       self.cfg.Update(self.target)
6799     except errors.ConfigurationError:
6800       raise errors.OpRetryError("There has been a modification to the"
6801                                 " config file and the operation has been"
6802                                 " aborted. Please retry.")
6803
6804
6805 class LUTestDelay(NoHooksLU):
6806   """Sleep for a specified amount of time.
6807
6808   This LU sleeps on the master and/or nodes for a specified amount of
6809   time.
6810
6811   """
6812   _OP_REQP = ["duration", "on_master", "on_nodes"]
6813   REQ_BGL = False
6814
6815   def ExpandNames(self):
6816     """Expand names and set required locks.
6817
6818     This expands the node list, if any.
6819
6820     """
6821     self.needed_locks = {}
6822     if self.op.on_nodes:
6823       # _GetWantedNodes can be used here, but is not always appropriate to use
6824       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6825       # more information.
6826       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6827       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6828
6829   def CheckPrereq(self):
6830     """Check prerequisites.
6831
6832     """
6833
6834   def Exec(self, feedback_fn):
6835     """Do the actual sleep.
6836
6837     """
6838     if self.op.on_master:
6839       if not utils.TestDelay(self.op.duration):
6840         raise errors.OpExecError("Error during master delay test")
6841     if self.op.on_nodes:
6842       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6843       if not result:
6844         raise errors.OpExecError("Complete failure from rpc call")
6845       for node, node_result in result.items():
6846         node_result.Raise()
6847         if not node_result.data:
6848           raise errors.OpExecError("Failure during rpc call to node %s,"
6849                                    " result: %s" % (node, node_result.data))
6850
6851
6852 class IAllocator(object):
6853   """IAllocator framework.
6854
6855   An IAllocator instance has three sets of attributes:
6856     - cfg that is needed to query the cluster
6857     - input data (all members of the _KEYS class attribute are required)
6858     - four buffer attributes (in|out_data|text), that represent the
6859       input (to the external script) in text and data structure format,
6860       and the output from it, again in two formats
6861     - the result variables from the script (success, info, nodes) for
6862       easy usage
6863
6864   """
6865   _ALLO_KEYS = [
6866     "mem_size", "disks", "disk_template",
6867     "os", "tags", "nics", "vcpus", "hypervisor",
6868     ]
6869   _RELO_KEYS = [
6870     "relocate_from",
6871     ]
6872
6873   def __init__(self, lu, mode, name, **kwargs):
6874     self.lu = lu
6875     # init buffer variables
6876     self.in_text = self.out_text = self.in_data = self.out_data = None
6877     # init all input fields so that pylint is happy
6878     self.mode = mode
6879     self.name = name
6880     self.mem_size = self.disks = self.disk_template = None
6881     self.os = self.tags = self.nics = self.vcpus = None
6882     self.hypervisor = None
6883     self.relocate_from = None
6884     # computed fields
6885     self.required_nodes = None
6886     # init result fields
6887     self.success = self.info = self.nodes = None
6888     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6889       keyset = self._ALLO_KEYS
6890     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6891       keyset = self._RELO_KEYS
6892     else:
6893       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6894                                    " IAllocator" % self.mode)
6895     for key in kwargs:
6896       if key not in keyset:
6897         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6898                                      " IAllocator" % key)
6899       setattr(self, key, kwargs[key])
6900     for key in keyset:
6901       if key not in kwargs:
6902         raise errors.ProgrammerError("Missing input parameter '%s' to"
6903                                      " IAllocator" % key)
6904     self._BuildInputData()
6905
6906   def _ComputeClusterData(self):
6907     """Compute the generic allocator input data.
6908
6909     This is the data that is independent of the actual operation.
6910
6911     """
6912     cfg = self.lu.cfg
6913     cluster_info = cfg.GetClusterInfo()
6914     # cluster data
6915     data = {
6916       "version": constants.IALLOCATOR_VERSION,
6917       "cluster_name": cfg.GetClusterName(),
6918       "cluster_tags": list(cluster_info.GetTags()),
6919       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6920       # we don't have job IDs
6921       }
6922     iinfo = cfg.GetAllInstancesInfo().values()
6923     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6924
6925     # node data
6926     node_results = {}
6927     node_list = cfg.GetNodeList()
6928
6929     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6930       hypervisor_name = self.hypervisor
6931     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6932       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6933
6934     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6935                                            hypervisor_name)
6936     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6937                        cluster_info.enabled_hypervisors)
6938     for nname, nresult in node_data.items():
6939       # first fill in static (config-based) values
6940       ninfo = cfg.GetNodeInfo(nname)
6941       pnr = {
6942         "tags": list(ninfo.GetTags()),
6943         "primary_ip": ninfo.primary_ip,
6944         "secondary_ip": ninfo.secondary_ip,
6945         "offline": ninfo.offline,
6946         "drained": ninfo.drained,
6947         "master_candidate": ninfo.master_candidate,
6948         }
6949
6950       if not (ninfo.offline or ninfo.drained):
6951         nresult.Raise()
6952         if not isinstance(nresult.data, dict):
6953           raise errors.OpExecError("Can't get data for node %s" % nname)
6954         remote_info = nresult.data
6955         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6956                      'vg_size', 'vg_free', 'cpu_total']:
6957           if attr not in remote_info:
6958             raise errors.OpExecError("Node '%s' didn't return attribute"
6959                                      " '%s'" % (nname, attr))
6960           try:
6961             remote_info[attr] = int(remote_info[attr])
6962           except ValueError, err:
6963             raise errors.OpExecError("Node '%s' returned invalid value"
6964                                      " for '%s': %s" % (nname, attr, err))
6965         # compute memory used by primary instances
6966         i_p_mem = i_p_up_mem = 0
6967         for iinfo, beinfo in i_list:
6968           if iinfo.primary_node == nname:
6969             i_p_mem += beinfo[constants.BE_MEMORY]
6970             if iinfo.name not in node_iinfo[nname].data:
6971               i_used_mem = 0
6972             else:
6973               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6974             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6975             remote_info['memory_free'] -= max(0, i_mem_diff)
6976
6977             if iinfo.admin_up:
6978               i_p_up_mem += beinfo[constants.BE_MEMORY]
6979
6980         # compute memory used by instances
6981         pnr_dyn = {
6982           "total_memory": remote_info['memory_total'],
6983           "reserved_memory": remote_info['memory_dom0'],
6984           "free_memory": remote_info['memory_free'],
6985           "total_disk": remote_info['vg_size'],
6986           "free_disk": remote_info['vg_free'],
6987           "total_cpus": remote_info['cpu_total'],
6988           "i_pri_memory": i_p_mem,
6989           "i_pri_up_memory": i_p_up_mem,
6990           }
6991         pnr.update(pnr_dyn)
6992
6993       node_results[nname] = pnr
6994     data["nodes"] = node_results
6995
6996     # instance data
6997     instance_data = {}
6998     for iinfo, beinfo in i_list:
6999       nic_data = [{"mac": n.mac, "ip": n.ip, "bridge": n.bridge}
7000                   for n in iinfo.nics]
7001       pir = {
7002         "tags": list(iinfo.GetTags()),
7003         "admin_up": iinfo.admin_up,
7004         "vcpus": beinfo[constants.BE_VCPUS],
7005         "memory": beinfo[constants.BE_MEMORY],
7006         "os": iinfo.os,
7007         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7008         "nics": nic_data,
7009         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7010         "disk_template": iinfo.disk_template,
7011         "hypervisor": iinfo.hypervisor,
7012         }
7013       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7014                                                  pir["disks"])
7015       instance_data[iinfo.name] = pir
7016
7017     data["instances"] = instance_data
7018
7019     self.in_data = data
7020
7021   def _AddNewInstance(self):
7022     """Add new instance data to allocator structure.
7023
7024     This in combination with _AllocatorGetClusterData will create the
7025     correct structure needed as input for the allocator.
7026
7027     The checks for the completeness of the opcode must have already been
7028     done.
7029
7030     """
7031     data = self.in_data
7032
7033     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7034
7035     if self.disk_template in constants.DTS_NET_MIRROR:
7036       self.required_nodes = 2
7037     else:
7038       self.required_nodes = 1
7039     request = {
7040       "type": "allocate",
7041       "name": self.name,
7042       "disk_template": self.disk_template,
7043       "tags": self.tags,
7044       "os": self.os,
7045       "vcpus": self.vcpus,
7046       "memory": self.mem_size,
7047       "disks": self.disks,
7048       "disk_space_total": disk_space,
7049       "nics": self.nics,
7050       "required_nodes": self.required_nodes,
7051       }
7052     data["request"] = request
7053
7054   def _AddRelocateInstance(self):
7055     """Add relocate instance data to allocator structure.
7056
7057     This in combination with _IAllocatorGetClusterData will create the
7058     correct structure needed as input for the allocator.
7059
7060     The checks for the completeness of the opcode must have already been
7061     done.
7062
7063     """
7064     instance = self.lu.cfg.GetInstanceInfo(self.name)
7065     if instance is None:
7066       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7067                                    " IAllocator" % self.name)
7068
7069     if instance.disk_template not in constants.DTS_NET_MIRROR:
7070       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7071
7072     if len(instance.secondary_nodes) != 1:
7073       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7074
7075     self.required_nodes = 1
7076     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7077     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7078
7079     request = {
7080       "type": "relocate",
7081       "name": self.name,
7082       "disk_space_total": disk_space,
7083       "required_nodes": self.required_nodes,
7084       "relocate_from": self.relocate_from,
7085       }
7086     self.in_data["request"] = request
7087
7088   def _BuildInputData(self):
7089     """Build input data structures.
7090
7091     """
7092     self._ComputeClusterData()
7093
7094     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7095       self._AddNewInstance()
7096     else:
7097       self._AddRelocateInstance()
7098
7099     self.in_text = serializer.Dump(self.in_data)
7100
7101   def Run(self, name, validate=True, call_fn=None):
7102     """Run an instance allocator and return the results.
7103
7104     """
7105     if call_fn is None:
7106       call_fn = self.lu.rpc.call_iallocator_runner
7107
7108     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7109     result.Raise()
7110
7111     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7112       raise errors.OpExecError("Invalid result from master iallocator runner")
7113
7114     rcode, stdout, stderr, fail = result.data
7115
7116     if rcode == constants.IARUN_NOTFOUND:
7117       raise errors.OpExecError("Can't find allocator '%s'" % name)
7118     elif rcode == constants.IARUN_FAILURE:
7119       raise errors.OpExecError("Instance allocator call failed: %s,"
7120                                " output: %s" % (fail, stdout+stderr))
7121     self.out_text = stdout
7122     if validate:
7123       self._ValidateResult()
7124
7125   def _ValidateResult(self):
7126     """Process the allocator results.
7127
7128     This will process and if successful save the result in
7129     self.out_data and the other parameters.
7130
7131     """
7132     try:
7133       rdict = serializer.Load(self.out_text)
7134     except Exception, err:
7135       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7136
7137     if not isinstance(rdict, dict):
7138       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7139
7140     for key in "success", "info", "nodes":
7141       if key not in rdict:
7142         raise errors.OpExecError("Can't parse iallocator results:"
7143                                  " missing key '%s'" % key)
7144       setattr(self, key, rdict[key])
7145
7146     if not isinstance(rdict["nodes"], list):
7147       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7148                                " is not a list")
7149     self.out_data = rdict
7150
7151
7152 class LUTestAllocator(NoHooksLU):
7153   """Run allocator tests.
7154
7155   This LU runs the allocator tests
7156
7157   """
7158   _OP_REQP = ["direction", "mode", "name"]
7159
7160   def CheckPrereq(self):
7161     """Check prerequisites.
7162
7163     This checks the opcode parameters depending on the director and mode test.
7164
7165     """
7166     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7167       for attr in ["name", "mem_size", "disks", "disk_template",
7168                    "os", "tags", "nics", "vcpus"]:
7169         if not hasattr(self.op, attr):
7170           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7171                                      attr)
7172       iname = self.cfg.ExpandInstanceName(self.op.name)
7173       if iname is not None:
7174         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7175                                    iname)
7176       if not isinstance(self.op.nics, list):
7177         raise errors.OpPrereqError("Invalid parameter 'nics'")
7178       for row in self.op.nics:
7179         if (not isinstance(row, dict) or
7180             "mac" not in row or
7181             "ip" not in row or
7182             "bridge" not in row):
7183           raise errors.OpPrereqError("Invalid contents of the"
7184                                      " 'nics' parameter")
7185       if not isinstance(self.op.disks, list):
7186         raise errors.OpPrereqError("Invalid parameter 'disks'")
7187       for row in self.op.disks:
7188         if (not isinstance(row, dict) or
7189             "size" not in row or
7190             not isinstance(row["size"], int) or
7191             "mode" not in row or
7192             row["mode"] not in ['r', 'w']):
7193           raise errors.OpPrereqError("Invalid contents of the"
7194                                      " 'disks' parameter")
7195       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7196         self.op.hypervisor = self.cfg.GetHypervisorType()
7197     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7198       if not hasattr(self.op, "name"):
7199         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7200       fname = self.cfg.ExpandInstanceName(self.op.name)
7201       if fname is None:
7202         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7203                                    self.op.name)
7204       self.op.name = fname
7205       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7206     else:
7207       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7208                                  self.op.mode)
7209
7210     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7211       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7212         raise errors.OpPrereqError("Missing allocator name")
7213     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7214       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7215                                  self.op.direction)
7216
7217   def Exec(self, feedback_fn):
7218     """Run the allocator test.
7219
7220     """
7221     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7222       ial = IAllocator(self,
7223                        mode=self.op.mode,
7224                        name=self.op.name,
7225                        mem_size=self.op.mem_size,
7226                        disks=self.op.disks,
7227                        disk_template=self.op.disk_template,
7228                        os=self.op.os,
7229                        tags=self.op.tags,
7230                        nics=self.op.nics,
7231                        vcpus=self.op.vcpus,
7232                        hypervisor=self.op.hypervisor,
7233                        )
7234     else:
7235       ial = IAllocator(self,
7236                        mode=self.op.mode,
7237                        name=self.op.name,
7238                        relocate_from=list(self.relocate_from),
7239                        )
7240
7241     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7242       result = ial.in_text
7243     else:
7244       ial.Run(self.op.allocator, validate=False)
7245       result = ial.out_text
7246     return result