Fix various pylint warnings
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq
51     - implement Exec
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   """
60   HPATH = None
61   HTYPE = None
62   _OP_REQP = []
63   REQ_BGL = True
64
65   def __init__(self, processor, op, context, rpc):
66     """Constructor for LogicalUnit.
67
68     This needs to be overriden in derived classes in order to check op
69     validity.
70
71     """
72     self.proc = processor
73     self.op = op
74     self.cfg = context.cfg
75     self.context = context
76     self.rpc = rpc
77     # Dicts used to declare locking needs to mcpu
78     self.needed_locks = None
79     self.acquired_locks = {}
80     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
81     self.add_locks = {}
82     self.remove_locks = {}
83     # Used to force good behavior when calling helper functions
84     self.recalculate_locks = {}
85     self.__ssh = None
86     # logging
87     self.LogWarning = processor.LogWarning
88     self.LogInfo = processor.LogInfo
89
90     for attr_name in self._OP_REQP:
91       attr_val = getattr(op, attr_name, None)
92       if attr_val is None:
93         raise errors.OpPrereqError("Required parameter '%s' missing" %
94                                    attr_name)
95     self.CheckArguments()
96
97   def __GetSSH(self):
98     """Returns the SshRunner object
99
100     """
101     if not self.__ssh:
102       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
103     return self.__ssh
104
105   ssh = property(fget=__GetSSH)
106
107   def CheckArguments(self):
108     """Check syntactic validity for the opcode arguments.
109
110     This method is for doing a simple syntactic check and ensure
111     validity of opcode parameters, without any cluster-related
112     checks. While the same can be accomplished in ExpandNames and/or
113     CheckPrereq, doing these separate is better because:
114
115       - ExpandNames is left as as purely a lock-related function
116       - CheckPrereq is run after we have aquired locks (and possible
117         waited for them)
118
119     The function is allowed to change the self.op attribute so that
120     later methods can no longer worry about missing parameters.
121
122     """
123     pass
124
125   def ExpandNames(self):
126     """Expand names for this LU.
127
128     This method is called before starting to execute the opcode, and it should
129     update all the parameters of the opcode to their canonical form (e.g. a
130     short node name must be fully expanded after this method has successfully
131     completed). This way locking, hooks, logging, ecc. can work correctly.
132
133     LUs which implement this method must also populate the self.needed_locks
134     member, as a dict with lock levels as keys, and a list of needed lock names
135     as values. Rules:
136
137       - use an empty dict if you don't need any lock
138       - if you don't need any lock at a particular level omit that level
139       - don't put anything for the BGL level
140       - if you want all locks at a level use locking.ALL_SET as a value
141
142     If you need to share locks (rather than acquire them exclusively) at one
143     level you can modify self.share_locks, setting a true value (usually 1) for
144     that level. By default locks are not shared.
145
146     Examples::
147
148       # Acquire all nodes and one instance
149       self.needed_locks = {
150         locking.LEVEL_NODE: locking.ALL_SET,
151         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
152       }
153       # Acquire just two nodes
154       self.needed_locks = {
155         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
156       }
157       # Acquire no locks
158       self.needed_locks = {} # No, you can't leave it to the default value None
159
160     """
161     # The implementation of this method is mandatory only if the new LU is
162     # concurrent, so that old LUs don't need to be changed all at the same
163     # time.
164     if self.REQ_BGL:
165       self.needed_locks = {} # Exclusive LUs don't need locks.
166     else:
167       raise NotImplementedError
168
169   def DeclareLocks(self, level):
170     """Declare LU locking needs for a level
171
172     While most LUs can just declare their locking needs at ExpandNames time,
173     sometimes there's the need to calculate some locks after having acquired
174     the ones before. This function is called just before acquiring locks at a
175     particular level, but after acquiring the ones at lower levels, and permits
176     such calculations. It can be used to modify self.needed_locks, and by
177     default it does nothing.
178
179     This function is only called if you have something already set in
180     self.needed_locks for the level.
181
182     @param level: Locking level which is going to be locked
183     @type level: member of ganeti.locking.LEVELS
184
185     """
186
187   def CheckPrereq(self):
188     """Check prerequisites for this LU.
189
190     This method should check that the prerequisites for the execution
191     of this LU are fulfilled. It can do internode communication, but
192     it should be idempotent - no cluster or system changes are
193     allowed.
194
195     The method should raise errors.OpPrereqError in case something is
196     not fulfilled. Its return value is ignored.
197
198     This method should also update all the parameters of the opcode to
199     their canonical form if it hasn't been done by ExpandNames before.
200
201     """
202     raise NotImplementedError
203
204   def Exec(self, feedback_fn):
205     """Execute the LU.
206
207     This method should implement the actual work. It should raise
208     errors.OpExecError for failures that are somewhat dealt with in
209     code, or expected.
210
211     """
212     raise NotImplementedError
213
214   def BuildHooksEnv(self):
215     """Build hooks environment for this LU.
216
217     This method should return a three-node tuple consisting of: a dict
218     containing the environment that will be used for running the
219     specific hook for this LU, a list of node names on which the hook
220     should run before the execution, and a list of node names on which
221     the hook should run after the execution.
222
223     The keys of the dict must not have 'GANETI_' prefixed as this will
224     be handled in the hooks runner. Also note additional keys will be
225     added by the hooks runner. If the LU doesn't define any
226     environment, an empty dict (and not None) should be returned.
227
228     No nodes should be returned as an empty list (and not None).
229
230     Note that if the HPATH for a LU class is None, this function will
231     not be called.
232
233     """
234     raise NotImplementedError
235
236   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
237     """Notify the LU about the results of its hooks.
238
239     This method is called every time a hooks phase is executed, and notifies
240     the Logical Unit about the hooks' result. The LU can then use it to alter
241     its result based on the hooks.  By default the method does nothing and the
242     previous result is passed back unchanged but any LU can define it if it
243     wants to use the local cluster hook-scripts somehow.
244
245     @param phase: one of L{constants.HOOKS_PHASE_POST} or
246         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
247     @param hook_results: the results of the multi-node hooks rpc call
248     @param feedback_fn: function used send feedback back to the caller
249     @param lu_result: the previous Exec result this LU had, or None
250         in the PRE phase
251     @return: the new Exec result, based on the previous result
252         and hook results
253
254     """
255     return lu_result
256
257   def _ExpandAndLockInstance(self):
258     """Helper function to expand and lock an instance.
259
260     Many LUs that work on an instance take its name in self.op.instance_name
261     and need to expand it and then declare the expanded name for locking. This
262     function does it, and then updates self.op.instance_name to the expanded
263     name. It also initializes needed_locks as a dict, if this hasn't been done
264     before.
265
266     """
267     if self.needed_locks is None:
268       self.needed_locks = {}
269     else:
270       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
271         "_ExpandAndLockInstance called with instance-level locks set"
272     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
273     if expanded_name is None:
274       raise errors.OpPrereqError("Instance '%s' not known" %
275                                   self.op.instance_name)
276     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
277     self.op.instance_name = expanded_name
278
279   def _LockInstancesNodes(self, primary_only=False):
280     """Helper function to declare instances' nodes for locking.
281
282     This function should be called after locking one or more instances to lock
283     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
284     with all primary or secondary nodes for instances already locked and
285     present in self.needed_locks[locking.LEVEL_INSTANCE].
286
287     It should be called from DeclareLocks, and for safety only works if
288     self.recalculate_locks[locking.LEVEL_NODE] is set.
289
290     In the future it may grow parameters to just lock some instance's nodes, or
291     to just lock primaries or secondary nodes, if needed.
292
293     If should be called in DeclareLocks in a way similar to::
294
295       if level == locking.LEVEL_NODE:
296         self._LockInstancesNodes()
297
298     @type primary_only: boolean
299     @param primary_only: only lock primary nodes of locked instances
300
301     """
302     assert locking.LEVEL_NODE in self.recalculate_locks, \
303       "_LockInstancesNodes helper function called with no nodes to recalculate"
304
305     # TODO: check if we're really been called with the instance locks held
306
307     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
308     # future we might want to have different behaviors depending on the value
309     # of self.recalculate_locks[locking.LEVEL_NODE]
310     wanted_nodes = []
311     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
312       instance = self.context.cfg.GetInstanceInfo(instance_name)
313       wanted_nodes.append(instance.primary_node)
314       if not primary_only:
315         wanted_nodes.extend(instance.secondary_nodes)
316
317     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
318       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
319     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
320       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
321
322     del self.recalculate_locks[locking.LEVEL_NODE]
323
324
325 class NoHooksLU(LogicalUnit):
326   """Simple LU which runs no hooks.
327
328   This LU is intended as a parent for other LogicalUnits which will
329   run no hooks, in order to reduce duplicate code.
330
331   """
332   HPATH = None
333   HTYPE = None
334
335
336 def _GetWantedNodes(lu, nodes):
337   """Returns list of checked and expanded node names.
338
339   @type lu: L{LogicalUnit}
340   @param lu: the logical unit on whose behalf we execute
341   @type nodes: list
342   @param nodes: list of node names or None for all nodes
343   @rtype: list
344   @return: the list of nodes, sorted
345   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
346
347   """
348   if not isinstance(nodes, list):
349     raise errors.OpPrereqError("Invalid argument type 'nodes'")
350
351   if not nodes:
352     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
353       " non-empty list of nodes whose name is to be expanded.")
354
355   wanted = []
356   for name in nodes:
357     node = lu.cfg.ExpandNodeName(name)
358     if node is None:
359       raise errors.OpPrereqError("No such node name '%s'" % name)
360     wanted.append(node)
361
362   return utils.NiceSort(wanted)
363
364
365 def _GetWantedInstances(lu, instances):
366   """Returns list of checked and expanded instance names.
367
368   @type lu: L{LogicalUnit}
369   @param lu: the logical unit on whose behalf we execute
370   @type instances: list
371   @param instances: list of instance names or None for all instances
372   @rtype: list
373   @return: the list of instances, sorted
374   @raise errors.OpPrereqError: if the instances parameter is wrong type
375   @raise errors.OpPrereqError: if any of the passed instances is not found
376
377   """
378   if not isinstance(instances, list):
379     raise errors.OpPrereqError("Invalid argument type 'instances'")
380
381   if instances:
382     wanted = []
383
384     for name in instances:
385       instance = lu.cfg.ExpandInstanceName(name)
386       if instance is None:
387         raise errors.OpPrereqError("No such instance name '%s'" % name)
388       wanted.append(instance)
389
390   else:
391     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
392   return wanted
393
394
395 def _CheckOutputFields(static, dynamic, selected):
396   """Checks whether all selected fields are valid.
397
398   @type static: L{utils.FieldSet}
399   @param static: static fields set
400   @type dynamic: L{utils.FieldSet}
401   @param dynamic: dynamic fields set
402
403   """
404   f = utils.FieldSet()
405   f.Extend(static)
406   f.Extend(dynamic)
407
408   delta = f.NonMatching(selected)
409   if delta:
410     raise errors.OpPrereqError("Unknown output fields selected: %s"
411                                % ",".join(delta))
412
413
414 def _CheckBooleanOpField(op, name):
415   """Validates boolean opcode parameters.
416
417   This will ensure that an opcode parameter is either a boolean value,
418   or None (but that it always exists).
419
420   """
421   val = getattr(op, name, None)
422   if not (val is None or isinstance(val, bool)):
423     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
424                                (name, str(val)))
425   setattr(op, name, val)
426
427
428 def _CheckNodeOnline(lu, node):
429   """Ensure that a given node is online.
430
431   @param lu: the LU on behalf of which we make the check
432   @param node: the node to check
433   @raise errors.OpPrereqError: if the node is offline
434
435   """
436   if lu.cfg.GetNodeInfo(node).offline:
437     raise errors.OpPrereqError("Can't use offline node %s" % node)
438
439
440 def _CheckNodeNotDrained(lu, node):
441   """Ensure that a given node is not drained.
442
443   @param lu: the LU on behalf of which we make the check
444   @param node: the node to check
445   @raise errors.OpPrereqError: if the node is drained
446
447   """
448   if lu.cfg.GetNodeInfo(node).drained:
449     raise errors.OpPrereqError("Can't use drained node %s" % node)
450
451
452 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
453                           memory, vcpus, nics, disk_template, disks,
454                           bep, hvp, hypervisor):
455   """Builds instance related env variables for hooks
456
457   This builds the hook environment from individual variables.
458
459   @type name: string
460   @param name: the name of the instance
461   @type primary_node: string
462   @param primary_node: the name of the instance's primary node
463   @type secondary_nodes: list
464   @param secondary_nodes: list of secondary nodes as strings
465   @type os_type: string
466   @param os_type: the name of the instance's OS
467   @type status: boolean
468   @param status: the should_run status of the instance
469   @type memory: string
470   @param memory: the memory size of the instance
471   @type vcpus: string
472   @param vcpus: the count of VCPUs the instance has
473   @type nics: list
474   @param nics: list of tuples (ip, mac, mode, link) representing
475       the NICs the instance has
476   @type disk_template: string
477   @param disk_template: the distk template of the instance
478   @type disks: list
479   @param disks: the list of (size, mode) pairs
480   @type bep: dict
481   @param bep: the backend parameters for the instance
482   @type hvp: dict
483   @param hvp: the hypervisor parameters for the instance
484   @type hypervisor: string
485   @param hypervisor: the hypervisor for the instance
486   @rtype: dict
487   @return: the hook environment for this instance
488
489   """
490   if status:
491     str_status = "up"
492   else:
493     str_status = "down"
494   env = {
495     "OP_TARGET": name,
496     "INSTANCE_NAME": name,
497     "INSTANCE_PRIMARY": primary_node,
498     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
499     "INSTANCE_OS_TYPE": os_type,
500     "INSTANCE_STATUS": str_status,
501     "INSTANCE_MEMORY": memory,
502     "INSTANCE_VCPUS": vcpus,
503     "INSTANCE_DISK_TEMPLATE": disk_template,
504     "INSTANCE_HYPERVISOR": hypervisor,
505   }
506
507   if nics:
508     nic_count = len(nics)
509     for idx, (ip, mac, mode, link) in enumerate(nics):
510       if ip is None:
511         ip = ""
512       env["INSTANCE_NIC%d_IP" % idx] = ip
513       env["INSTANCE_NIC%d_MAC" % idx] = mac
514       env["INSTANCE_NIC%d_MODE" % idx] = mode
515       env["INSTANCE_NIC%d_LINK" % idx] = link
516       if mode == constants.NIC_MODE_BRIDGED:
517         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
518   else:
519     nic_count = 0
520
521   env["INSTANCE_NIC_COUNT"] = nic_count
522
523   if disks:
524     disk_count = len(disks)
525     for idx, (size, mode) in enumerate(disks):
526       env["INSTANCE_DISK%d_SIZE" % idx] = size
527       env["INSTANCE_DISK%d_MODE" % idx] = mode
528   else:
529     disk_count = 0
530
531   env["INSTANCE_DISK_COUNT"] = disk_count
532
533   for source, kind in [(bep, "BE"), (hvp, "HV")]:
534     for key, value in source.items():
535       env["INSTANCE_%s_%s" % (kind, key)] = value
536
537   return env
538
539 def _NICListToTuple(lu, nics):
540   """Build a list of nic information tuples.
541
542   This list is suitable to be passed to _BuildInstanceHookEnv or as a return
543   value in LUQueryInstanceData.
544
545   @type lu:  L{LogicalUnit}
546   @param lu: the logical unit on whose behalf we execute
547   @type nics: list of L{objects.NIC}
548   @param nics: list of nics to convert to hooks tuples
549
550   """
551   hooks_nics = []
552   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
553   for nic in nics:
554     ip = nic.ip
555     mac = nic.mac
556     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
557     mode = filled_params[constants.NIC_MODE]
558     link = filled_params[constants.NIC_LINK]
559     hooks_nics.append((ip, mac, mode, link))
560   return hooks_nics
561
562 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
563   """Builds instance related env variables for hooks from an object.
564
565   @type lu: L{LogicalUnit}
566   @param lu: the logical unit on whose behalf we execute
567   @type instance: L{objects.Instance}
568   @param instance: the instance for which we should build the
569       environment
570   @type override: dict
571   @param override: dictionary with key/values that will override
572       our values
573   @rtype: dict
574   @return: the hook environment dictionary
575
576   """
577   cluster = lu.cfg.GetClusterInfo()
578   bep = cluster.FillBE(instance)
579   hvp = cluster.FillHV(instance)
580   args = {
581     'name': instance.name,
582     'primary_node': instance.primary_node,
583     'secondary_nodes': instance.secondary_nodes,
584     'os_type': instance.os,
585     'status': instance.admin_up,
586     'memory': bep[constants.BE_MEMORY],
587     'vcpus': bep[constants.BE_VCPUS],
588     'nics': _NICListToTuple(lu, instance.nics),
589     'disk_template': instance.disk_template,
590     'disks': [(disk.size, disk.mode) for disk in instance.disks],
591     'bep': bep,
592     'hvp': hvp,
593     'hypervisor': instance.hypervisor,
594   }
595   if override:
596     args.update(override)
597   return _BuildInstanceHookEnv(**args)
598
599
600 def _AdjustCandidatePool(lu):
601   """Adjust the candidate pool after node operations.
602
603   """
604   mod_list = lu.cfg.MaintainCandidatePool()
605   if mod_list:
606     lu.LogInfo("Promoted nodes to master candidate role: %s",
607                ", ".join(node.name for node in mod_list))
608     for name in mod_list:
609       lu.context.ReaddNode(name)
610   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
611   if mc_now > mc_max:
612     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
613                (mc_now, mc_max))
614
615
616 def _CheckNicsBridgesExist(lu, target_nics, target_node,
617                                profile=constants.PP_DEFAULT):
618   """Check that the brigdes needed by a list of nics exist.
619
620   """
621   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
622   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
623                 for nic in target_nics]
624   brlist = [params[constants.NIC_LINK] for params in paramslist
625             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
626   if brlist:
627     result = lu.rpc.call_bridges_exist(target_node, brlist)
628     result.Raise("Error checking bridges on destination node '%s'" %
629                  target_node, prereq=True)
630
631
632 def _CheckInstanceBridgesExist(lu, instance, node=None):
633   """Check that the brigdes needed by an instance exist.
634
635   """
636   if node is None:
637     node = instance.primary_node
638   _CheckNicsBridgesExist(lu, instance.nics, node)
639
640
641 class LUDestroyCluster(NoHooksLU):
642   """Logical unit for destroying the cluster.
643
644   """
645   _OP_REQP = []
646
647   def CheckPrereq(self):
648     """Check prerequisites.
649
650     This checks whether the cluster is empty.
651
652     Any errors are signalled by raising errors.OpPrereqError.
653
654     """
655     master = self.cfg.GetMasterNode()
656
657     nodelist = self.cfg.GetNodeList()
658     if len(nodelist) != 1 or nodelist[0] != master:
659       raise errors.OpPrereqError("There are still %d node(s) in"
660                                  " this cluster." % (len(nodelist) - 1))
661     instancelist = self.cfg.GetInstanceList()
662     if instancelist:
663       raise errors.OpPrereqError("There are still %d instance(s) in"
664                                  " this cluster." % len(instancelist))
665
666   def Exec(self, feedback_fn):
667     """Destroys the cluster.
668
669     """
670     master = self.cfg.GetMasterNode()
671     result = self.rpc.call_node_stop_master(master, False)
672     result.Raise("Could not disable the master role")
673     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
674     utils.CreateBackup(priv_key)
675     utils.CreateBackup(pub_key)
676     return master
677
678
679 class LUVerifyCluster(LogicalUnit):
680   """Verifies the cluster status.
681
682   """
683   HPATH = "cluster-verify"
684   HTYPE = constants.HTYPE_CLUSTER
685   _OP_REQP = ["skip_checks"]
686   REQ_BGL = False
687
688   def ExpandNames(self):
689     self.needed_locks = {
690       locking.LEVEL_NODE: locking.ALL_SET,
691       locking.LEVEL_INSTANCE: locking.ALL_SET,
692     }
693     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
694
695   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
696                   node_result, feedback_fn, master_files,
697                   drbd_map, vg_name):
698     """Run multiple tests against a node.
699
700     Test list:
701
702       - compares ganeti version
703       - checks vg existance and size > 20G
704       - checks config file checksum
705       - checks ssh to other nodes
706
707     @type nodeinfo: L{objects.Node}
708     @param nodeinfo: the node to check
709     @param file_list: required list of files
710     @param local_cksum: dictionary of local files and their checksums
711     @param node_result: the results from the node
712     @param feedback_fn: function used to accumulate results
713     @param master_files: list of files that only masters should have
714     @param drbd_map: the useddrbd minors for this node, in
715         form of minor: (instance, must_exist) which correspond to instances
716         and their running status
717     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
718
719     """
720     node = nodeinfo.name
721
722     # main result, node_result should be a non-empty dict
723     if not node_result or not isinstance(node_result, dict):
724       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
725       return True
726
727     # compares ganeti version
728     local_version = constants.PROTOCOL_VERSION
729     remote_version = node_result.get('version', None)
730     if not (remote_version and isinstance(remote_version, (list, tuple)) and
731             len(remote_version) == 2):
732       feedback_fn("  - ERROR: connection to %s failed" % (node))
733       return True
734
735     if local_version != remote_version[0]:
736       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
737                   " node %s %s" % (local_version, node, remote_version[0]))
738       return True
739
740     # node seems compatible, we can actually try to look into its results
741
742     bad = False
743
744     # full package version
745     if constants.RELEASE_VERSION != remote_version[1]:
746       feedback_fn("  - WARNING: software version mismatch: master %s,"
747                   " node %s %s" %
748                   (constants.RELEASE_VERSION, node, remote_version[1]))
749
750     # checks vg existence and size > 20G
751     if vg_name is not None:
752       vglist = node_result.get(constants.NV_VGLIST, None)
753       if not vglist:
754         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
755                         (node,))
756         bad = True
757       else:
758         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
759                                               constants.MIN_VG_SIZE)
760         if vgstatus:
761           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
762           bad = True
763
764     # checks config file checksum
765
766     remote_cksum = node_result.get(constants.NV_FILELIST, None)
767     if not isinstance(remote_cksum, dict):
768       bad = True
769       feedback_fn("  - ERROR: node hasn't returned file checksum data")
770     else:
771       for file_name in file_list:
772         node_is_mc = nodeinfo.master_candidate
773         must_have_file = file_name not in master_files
774         if file_name not in remote_cksum:
775           if node_is_mc or must_have_file:
776             bad = True
777             feedback_fn("  - ERROR: file '%s' missing" % file_name)
778         elif remote_cksum[file_name] != local_cksum[file_name]:
779           if node_is_mc or must_have_file:
780             bad = True
781             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
782           else:
783             # not candidate and this is not a must-have file
784             bad = True
785             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
786                         " '%s'" % file_name)
787         else:
788           # all good, except non-master/non-must have combination
789           if not node_is_mc and not must_have_file:
790             feedback_fn("  - ERROR: file '%s' should not exist on non master"
791                         " candidates" % file_name)
792
793     # checks ssh to any
794
795     if constants.NV_NODELIST not in node_result:
796       bad = True
797       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
798     else:
799       if node_result[constants.NV_NODELIST]:
800         bad = True
801         for node in node_result[constants.NV_NODELIST]:
802           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
803                           (node, node_result[constants.NV_NODELIST][node]))
804
805     if constants.NV_NODENETTEST not in node_result:
806       bad = True
807       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
808     else:
809       if node_result[constants.NV_NODENETTEST]:
810         bad = True
811         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
812         for node in nlist:
813           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
814                           (node, node_result[constants.NV_NODENETTEST][node]))
815
816     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
817     if isinstance(hyp_result, dict):
818       for hv_name, hv_result in hyp_result.iteritems():
819         if hv_result is not None:
820           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
821                       (hv_name, hv_result))
822
823     # check used drbd list
824     if vg_name is not None:
825       used_minors = node_result.get(constants.NV_DRBDLIST, [])
826       if not isinstance(used_minors, (tuple, list)):
827         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
828                     str(used_minors))
829       else:
830         for minor, (iname, must_exist) in drbd_map.items():
831           if minor not in used_minors and must_exist:
832             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
833                         " not active" % (minor, iname))
834             bad = True
835         for minor in used_minors:
836           if minor not in drbd_map:
837             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
838                         minor)
839             bad = True
840
841     return bad
842
843   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
844                       node_instance, feedback_fn, n_offline):
845     """Verify an instance.
846
847     This function checks to see if the required block devices are
848     available on the instance's node.
849
850     """
851     bad = False
852
853     node_current = instanceconfig.primary_node
854
855     node_vol_should = {}
856     instanceconfig.MapLVsByNode(node_vol_should)
857
858     for node in node_vol_should:
859       if node in n_offline:
860         # ignore missing volumes on offline nodes
861         continue
862       for volume in node_vol_should[node]:
863         if node not in node_vol_is or volume not in node_vol_is[node]:
864           feedback_fn("  - ERROR: volume %s missing on node %s" %
865                           (volume, node))
866           bad = True
867
868     if instanceconfig.admin_up:
869       if ((node_current not in node_instance or
870           not instance in node_instance[node_current]) and
871           node_current not in n_offline):
872         feedback_fn("  - ERROR: instance %s not running on node %s" %
873                         (instance, node_current))
874         bad = True
875
876     for node in node_instance:
877       if (not node == node_current):
878         if instance in node_instance[node]:
879           feedback_fn("  - ERROR: instance %s should not run on node %s" %
880                           (instance, node))
881           bad = True
882
883     return bad
884
885   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
886     """Verify if there are any unknown volumes in the cluster.
887
888     The .os, .swap and backup volumes are ignored. All other volumes are
889     reported as unknown.
890
891     """
892     bad = False
893
894     for node in node_vol_is:
895       for volume in node_vol_is[node]:
896         if node not in node_vol_should or volume not in node_vol_should[node]:
897           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
898                       (volume, node))
899           bad = True
900     return bad
901
902   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
903     """Verify the list of running instances.
904
905     This checks what instances are running but unknown to the cluster.
906
907     """
908     bad = False
909     for node in node_instance:
910       for runninginstance in node_instance[node]:
911         if runninginstance not in instancelist:
912           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
913                           (runninginstance, node))
914           bad = True
915     return bad
916
917   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
918     """Verify N+1 Memory Resilience.
919
920     Check that if one single node dies we can still start all the instances it
921     was primary for.
922
923     """
924     bad = False
925
926     for node, nodeinfo in node_info.iteritems():
927       # This code checks that every node which is now listed as secondary has
928       # enough memory to host all instances it is supposed to should a single
929       # other node in the cluster fail.
930       # FIXME: not ready for failover to an arbitrary node
931       # FIXME: does not support file-backed instances
932       # WARNING: we currently take into account down instances as well as up
933       # ones, considering that even if they're down someone might want to start
934       # them even in the event of a node failure.
935       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
936         needed_mem = 0
937         for instance in instances:
938           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
939           if bep[constants.BE_AUTO_BALANCE]:
940             needed_mem += bep[constants.BE_MEMORY]
941         if nodeinfo['mfree'] < needed_mem:
942           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
943                       " failovers should node %s fail" % (node, prinode))
944           bad = True
945     return bad
946
947   def CheckPrereq(self):
948     """Check prerequisites.
949
950     Transform the list of checks we're going to skip into a set and check that
951     all its members are valid.
952
953     """
954     self.skip_set = frozenset(self.op.skip_checks)
955     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
956       raise errors.OpPrereqError("Invalid checks to be skipped specified")
957
958   def BuildHooksEnv(self):
959     """Build hooks env.
960
961     Cluster-Verify hooks just rone in the post phase and their failure makes
962     the output be logged in the verify output and the verification to fail.
963
964     """
965     all_nodes = self.cfg.GetNodeList()
966     env = {
967       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
968       }
969     for node in self.cfg.GetAllNodesInfo().values():
970       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
971
972     return env, [], all_nodes
973
974   def Exec(self, feedback_fn):
975     """Verify integrity of cluster, performing various test on nodes.
976
977     """
978     bad = False
979     feedback_fn("* Verifying global settings")
980     for msg in self.cfg.VerifyConfig():
981       feedback_fn("  - ERROR: %s" % msg)
982
983     vg_name = self.cfg.GetVGName()
984     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
985     nodelist = utils.NiceSort(self.cfg.GetNodeList())
986     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
987     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
988     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
989                         for iname in instancelist)
990     i_non_redundant = [] # Non redundant instances
991     i_non_a_balanced = [] # Non auto-balanced instances
992     n_offline = [] # List of offline nodes
993     n_drained = [] # List of nodes being drained
994     node_volume = {}
995     node_instance = {}
996     node_info = {}
997     instance_cfg = {}
998
999     # FIXME: verify OS list
1000     # do local checksums
1001     master_files = [constants.CLUSTER_CONF_FILE]
1002
1003     file_names = ssconf.SimpleStore().GetFileList()
1004     file_names.append(constants.SSL_CERT_FILE)
1005     file_names.append(constants.RAPI_CERT_FILE)
1006     file_names.extend(master_files)
1007
1008     local_checksums = utils.FingerprintFiles(file_names)
1009
1010     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1011     node_verify_param = {
1012       constants.NV_FILELIST: file_names,
1013       constants.NV_NODELIST: [node.name for node in nodeinfo
1014                               if not node.offline],
1015       constants.NV_HYPERVISOR: hypervisors,
1016       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1017                                   node.secondary_ip) for node in nodeinfo
1018                                  if not node.offline],
1019       constants.NV_INSTANCELIST: hypervisors,
1020       constants.NV_VERSION: None,
1021       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1022       }
1023     if vg_name is not None:
1024       node_verify_param[constants.NV_VGLIST] = None
1025       node_verify_param[constants.NV_LVLIST] = vg_name
1026       node_verify_param[constants.NV_DRBDLIST] = None
1027     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1028                                            self.cfg.GetClusterName())
1029
1030     cluster = self.cfg.GetClusterInfo()
1031     master_node = self.cfg.GetMasterNode()
1032     all_drbd_map = self.cfg.ComputeDRBDMap()
1033
1034     for node_i in nodeinfo:
1035       node = node_i.name
1036
1037       if node_i.offline:
1038         feedback_fn("* Skipping offline node %s" % (node,))
1039         n_offline.append(node)
1040         continue
1041
1042       if node == master_node:
1043         ntype = "master"
1044       elif node_i.master_candidate:
1045         ntype = "master candidate"
1046       elif node_i.drained:
1047         ntype = "drained"
1048         n_drained.append(node)
1049       else:
1050         ntype = "regular"
1051       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1052
1053       msg = all_nvinfo[node].fail_msg
1054       if msg:
1055         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1056         bad = True
1057         continue
1058
1059       nresult = all_nvinfo[node].payload
1060       node_drbd = {}
1061       for minor, instance in all_drbd_map[node].items():
1062         if instance not in instanceinfo:
1063           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1064                       instance)
1065           # ghost instance should not be running, but otherwise we
1066           # don't give double warnings (both ghost instance and
1067           # unallocated minor in use)
1068           node_drbd[minor] = (instance, False)
1069         else:
1070           instance = instanceinfo[instance]
1071           node_drbd[minor] = (instance.name, instance.admin_up)
1072       result = self._VerifyNode(node_i, file_names, local_checksums,
1073                                 nresult, feedback_fn, master_files,
1074                                 node_drbd, vg_name)
1075       bad = bad or result
1076
1077       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1078       if vg_name is None:
1079         node_volume[node] = {}
1080       elif isinstance(lvdata, basestring):
1081         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1082                     (node, utils.SafeEncode(lvdata)))
1083         bad = True
1084         node_volume[node] = {}
1085       elif not isinstance(lvdata, dict):
1086         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1087         bad = True
1088         continue
1089       else:
1090         node_volume[node] = lvdata
1091
1092       # node_instance
1093       idata = nresult.get(constants.NV_INSTANCELIST, None)
1094       if not isinstance(idata, list):
1095         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1096                     (node,))
1097         bad = True
1098         continue
1099
1100       node_instance[node] = idata
1101
1102       # node_info
1103       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1104       if not isinstance(nodeinfo, dict):
1105         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1106         bad = True
1107         continue
1108
1109       try:
1110         node_info[node] = {
1111           "mfree": int(nodeinfo['memory_free']),
1112           "pinst": [],
1113           "sinst": [],
1114           # dictionary holding all instances this node is secondary for,
1115           # grouped by their primary node. Each key is a cluster node, and each
1116           # value is a list of instances which have the key as primary and the
1117           # current node as secondary.  this is handy to calculate N+1 memory
1118           # availability if you can only failover from a primary to its
1119           # secondary.
1120           "sinst-by-pnode": {},
1121         }
1122         # FIXME: devise a free space model for file based instances as well
1123         if vg_name is not None:
1124           if (constants.NV_VGLIST not in nresult or
1125               vg_name not in nresult[constants.NV_VGLIST]):
1126             feedback_fn("  - ERROR: node %s didn't return data for the"
1127                         " volume group '%s' - it is either missing or broken" %
1128                         (node, vg_name))
1129             bad = True
1130             continue
1131           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1132       except (ValueError, KeyError):
1133         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1134                     " from node %s" % (node,))
1135         bad = True
1136         continue
1137
1138     node_vol_should = {}
1139
1140     for instance in instancelist:
1141       feedback_fn("* Verifying instance %s" % instance)
1142       inst_config = instanceinfo[instance]
1143       result =  self._VerifyInstance(instance, inst_config, node_volume,
1144                                      node_instance, feedback_fn, n_offline)
1145       bad = bad or result
1146       inst_nodes_offline = []
1147
1148       inst_config.MapLVsByNode(node_vol_should)
1149
1150       instance_cfg[instance] = inst_config
1151
1152       pnode = inst_config.primary_node
1153       if pnode in node_info:
1154         node_info[pnode]['pinst'].append(instance)
1155       elif pnode not in n_offline:
1156         feedback_fn("  - ERROR: instance %s, connection to primary node"
1157                     " %s failed" % (instance, pnode))
1158         bad = True
1159
1160       if pnode in n_offline:
1161         inst_nodes_offline.append(pnode)
1162
1163       # If the instance is non-redundant we cannot survive losing its primary
1164       # node, so we are not N+1 compliant. On the other hand we have no disk
1165       # templates with more than one secondary so that situation is not well
1166       # supported either.
1167       # FIXME: does not support file-backed instances
1168       if len(inst_config.secondary_nodes) == 0:
1169         i_non_redundant.append(instance)
1170       elif len(inst_config.secondary_nodes) > 1:
1171         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1172                     % instance)
1173
1174       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1175         i_non_a_balanced.append(instance)
1176
1177       for snode in inst_config.secondary_nodes:
1178         if snode in node_info:
1179           node_info[snode]['sinst'].append(instance)
1180           if pnode not in node_info[snode]['sinst-by-pnode']:
1181             node_info[snode]['sinst-by-pnode'][pnode] = []
1182           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1183         elif snode not in n_offline:
1184           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1185                       " %s failed" % (instance, snode))
1186           bad = True
1187         if snode in n_offline:
1188           inst_nodes_offline.append(snode)
1189
1190       if inst_nodes_offline:
1191         # warn that the instance lives on offline nodes, and set bad=True
1192         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1193                     ", ".join(inst_nodes_offline))
1194         bad = True
1195
1196     feedback_fn("* Verifying orphan volumes")
1197     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1198                                        feedback_fn)
1199     bad = bad or result
1200
1201     feedback_fn("* Verifying remaining instances")
1202     result = self._VerifyOrphanInstances(instancelist, node_instance,
1203                                          feedback_fn)
1204     bad = bad or result
1205
1206     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1207       feedback_fn("* Verifying N+1 Memory redundancy")
1208       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1209       bad = bad or result
1210
1211     feedback_fn("* Other Notes")
1212     if i_non_redundant:
1213       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1214                   % len(i_non_redundant))
1215
1216     if i_non_a_balanced:
1217       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1218                   % len(i_non_a_balanced))
1219
1220     if n_offline:
1221       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1222
1223     if n_drained:
1224       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1225
1226     return not bad
1227
1228   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1229     """Analize the post-hooks' result
1230
1231     This method analyses the hook result, handles it, and sends some
1232     nicely-formatted feedback back to the user.
1233
1234     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1235         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1236     @param hooks_results: the results of the multi-node hooks rpc call
1237     @param feedback_fn: function used send feedback back to the caller
1238     @param lu_result: previous Exec result
1239     @return: the new Exec result, based on the previous result
1240         and hook results
1241
1242     """
1243     # We only really run POST phase hooks, and are only interested in
1244     # their results
1245     if phase == constants.HOOKS_PHASE_POST:
1246       # Used to change hooks' output to proper indentation
1247       indent_re = re.compile('^', re.M)
1248       feedback_fn("* Hooks Results")
1249       if not hooks_results:
1250         feedback_fn("  - ERROR: general communication failure")
1251         lu_result = 1
1252       else:
1253         for node_name in hooks_results:
1254           show_node_header = True
1255           res = hooks_results[node_name]
1256           msg = res.fail_msg
1257           if msg:
1258             if res.offline:
1259               # no need to warn or set fail return value
1260               continue
1261             feedback_fn("    Communication failure in hooks execution: %s" %
1262                         msg)
1263             lu_result = 1
1264             continue
1265           for script, hkr, output in res.payload:
1266             if hkr == constants.HKR_FAIL:
1267               # The node header is only shown once, if there are
1268               # failing hooks on that node
1269               if show_node_header:
1270                 feedback_fn("  Node %s:" % node_name)
1271                 show_node_header = False
1272               feedback_fn("    ERROR: Script %s failed, output:" % script)
1273               output = indent_re.sub('      ', output)
1274               feedback_fn("%s" % output)
1275               lu_result = 1
1276
1277       return lu_result
1278
1279
1280 class LUVerifyDisks(NoHooksLU):
1281   """Verifies the cluster disks status.
1282
1283   """
1284   _OP_REQP = []
1285   REQ_BGL = False
1286
1287   def ExpandNames(self):
1288     self.needed_locks = {
1289       locking.LEVEL_NODE: locking.ALL_SET,
1290       locking.LEVEL_INSTANCE: locking.ALL_SET,
1291     }
1292     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1293
1294   def CheckPrereq(self):
1295     """Check prerequisites.
1296
1297     This has no prerequisites.
1298
1299     """
1300     pass
1301
1302   def Exec(self, feedback_fn):
1303     """Verify integrity of cluster disks.
1304
1305     @rtype: tuple of three items
1306     @return: a tuple of (dict of node-to-node_error, list of instances
1307         which need activate-disks, dict of instance: (node, volume) for
1308         missing volumes
1309
1310     """
1311     result = res_nodes, res_instances, res_missing = {}, [], {}
1312
1313     vg_name = self.cfg.GetVGName()
1314     nodes = utils.NiceSort(self.cfg.GetNodeList())
1315     instances = [self.cfg.GetInstanceInfo(name)
1316                  for name in self.cfg.GetInstanceList()]
1317
1318     nv_dict = {}
1319     for inst in instances:
1320       inst_lvs = {}
1321       if (not inst.admin_up or
1322           inst.disk_template not in constants.DTS_NET_MIRROR):
1323         continue
1324       inst.MapLVsByNode(inst_lvs)
1325       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1326       for node, vol_list in inst_lvs.iteritems():
1327         for vol in vol_list:
1328           nv_dict[(node, vol)] = inst
1329
1330     if not nv_dict:
1331       return result
1332
1333     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1334
1335     to_act = set()
1336     for node in nodes:
1337       # node_volume
1338       node_res = node_lvs[node]
1339       if node_res.offline:
1340         continue
1341       msg = node_res.fail_msg
1342       if msg:
1343         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1344         res_nodes[node] = msg
1345         continue
1346
1347       lvs = node_res.payload
1348       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1349         inst = nv_dict.pop((node, lv_name), None)
1350         if (not lv_online and inst is not None
1351             and inst.name not in res_instances):
1352           res_instances.append(inst.name)
1353
1354     # any leftover items in nv_dict are missing LVs, let's arrange the
1355     # data better
1356     for key, inst in nv_dict.iteritems():
1357       if inst.name not in res_missing:
1358         res_missing[inst.name] = []
1359       res_missing[inst.name].append(key)
1360
1361     return result
1362
1363
1364 class LURenameCluster(LogicalUnit):
1365   """Rename the cluster.
1366
1367   """
1368   HPATH = "cluster-rename"
1369   HTYPE = constants.HTYPE_CLUSTER
1370   _OP_REQP = ["name"]
1371
1372   def BuildHooksEnv(self):
1373     """Build hooks env.
1374
1375     """
1376     env = {
1377       "OP_TARGET": self.cfg.GetClusterName(),
1378       "NEW_NAME": self.op.name,
1379       }
1380     mn = self.cfg.GetMasterNode()
1381     return env, [mn], [mn]
1382
1383   def CheckPrereq(self):
1384     """Verify that the passed name is a valid one.
1385
1386     """
1387     hostname = utils.HostInfo(self.op.name)
1388
1389     new_name = hostname.name
1390     self.ip = new_ip = hostname.ip
1391     old_name = self.cfg.GetClusterName()
1392     old_ip = self.cfg.GetMasterIP()
1393     if new_name == old_name and new_ip == old_ip:
1394       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1395                                  " cluster has changed")
1396     if new_ip != old_ip:
1397       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1398         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1399                                    " reachable on the network. Aborting." %
1400                                    new_ip)
1401
1402     self.op.name = new_name
1403
1404   def Exec(self, feedback_fn):
1405     """Rename the cluster.
1406
1407     """
1408     clustername = self.op.name
1409     ip = self.ip
1410
1411     # shutdown the master IP
1412     master = self.cfg.GetMasterNode()
1413     result = self.rpc.call_node_stop_master(master, False)
1414     result.Raise("Could not disable the master role")
1415
1416     try:
1417       cluster = self.cfg.GetClusterInfo()
1418       cluster.cluster_name = clustername
1419       cluster.master_ip = ip
1420       self.cfg.Update(cluster)
1421
1422       # update the known hosts file
1423       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1424       node_list = self.cfg.GetNodeList()
1425       try:
1426         node_list.remove(master)
1427       except ValueError:
1428         pass
1429       result = self.rpc.call_upload_file(node_list,
1430                                          constants.SSH_KNOWN_HOSTS_FILE)
1431       for to_node, to_result in result.iteritems():
1432         msg = to_result.fail_msg
1433         if msg:
1434           msg = ("Copy of file %s to node %s failed: %s" %
1435                  (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1436           self.proc.LogWarning(msg)
1437
1438     finally:
1439       result = self.rpc.call_node_start_master(master, False)
1440       msg = result.fail_msg
1441       if msg:
1442         self.LogWarning("Could not re-enable the master role on"
1443                         " the master, please restart manually: %s", msg)
1444
1445
1446 def _RecursiveCheckIfLVMBased(disk):
1447   """Check if the given disk or its children are lvm-based.
1448
1449   @type disk: L{objects.Disk}
1450   @param disk: the disk to check
1451   @rtype: booleean
1452   @return: boolean indicating whether a LD_LV dev_type was found or not
1453
1454   """
1455   if disk.children:
1456     for chdisk in disk.children:
1457       if _RecursiveCheckIfLVMBased(chdisk):
1458         return True
1459   return disk.dev_type == constants.LD_LV
1460
1461
1462 class LUSetClusterParams(LogicalUnit):
1463   """Change the parameters of the cluster.
1464
1465   """
1466   HPATH = "cluster-modify"
1467   HTYPE = constants.HTYPE_CLUSTER
1468   _OP_REQP = []
1469   REQ_BGL = False
1470
1471   def CheckArguments(self):
1472     """Check parameters
1473
1474     """
1475     if not hasattr(self.op, "candidate_pool_size"):
1476       self.op.candidate_pool_size = None
1477     if self.op.candidate_pool_size is not None:
1478       try:
1479         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1480       except (ValueError, TypeError), err:
1481         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1482                                    str(err))
1483       if self.op.candidate_pool_size < 1:
1484         raise errors.OpPrereqError("At least one master candidate needed")
1485
1486   def ExpandNames(self):
1487     # FIXME: in the future maybe other cluster params won't require checking on
1488     # all nodes to be modified.
1489     self.needed_locks = {
1490       locking.LEVEL_NODE: locking.ALL_SET,
1491     }
1492     self.share_locks[locking.LEVEL_NODE] = 1
1493
1494   def BuildHooksEnv(self):
1495     """Build hooks env.
1496
1497     """
1498     env = {
1499       "OP_TARGET": self.cfg.GetClusterName(),
1500       "NEW_VG_NAME": self.op.vg_name,
1501       }
1502     mn = self.cfg.GetMasterNode()
1503     return env, [mn], [mn]
1504
1505   def CheckPrereq(self):
1506     """Check prerequisites.
1507
1508     This checks whether the given params don't conflict and
1509     if the given volume group is valid.
1510
1511     """
1512     if self.op.vg_name is not None and not self.op.vg_name:
1513       instances = self.cfg.GetAllInstancesInfo().values()
1514       for inst in instances:
1515         for disk in inst.disks:
1516           if _RecursiveCheckIfLVMBased(disk):
1517             raise errors.OpPrereqError("Cannot disable lvm storage while"
1518                                        " lvm-based instances exist")
1519
1520     node_list = self.acquired_locks[locking.LEVEL_NODE]
1521
1522     # if vg_name not None, checks given volume group on all nodes
1523     if self.op.vg_name:
1524       vglist = self.rpc.call_vg_list(node_list)
1525       for node in node_list:
1526         msg = vglist[node].fail_msg
1527         if msg:
1528           # ignoring down node
1529           self.LogWarning("Error while gathering data on node %s"
1530                           " (ignoring node): %s", node, msg)
1531           continue
1532         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1533                                               self.op.vg_name,
1534                                               constants.MIN_VG_SIZE)
1535         if vgstatus:
1536           raise errors.OpPrereqError("Error on node '%s': %s" %
1537                                      (node, vgstatus))
1538
1539     self.cluster = cluster = self.cfg.GetClusterInfo()
1540     # validate params changes
1541     if self.op.beparams:
1542       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1543       self.new_beparams = objects.FillDict(
1544         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1545
1546     if self.op.nicparams:
1547       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1548       self.new_nicparams = objects.FillDict(
1549         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1550       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1551
1552     # hypervisor list/parameters
1553     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1554     if self.op.hvparams:
1555       if not isinstance(self.op.hvparams, dict):
1556         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1557       for hv_name, hv_dict in self.op.hvparams.items():
1558         if hv_name not in self.new_hvparams:
1559           self.new_hvparams[hv_name] = hv_dict
1560         else:
1561           self.new_hvparams[hv_name].update(hv_dict)
1562
1563     if self.op.enabled_hypervisors is not None:
1564       self.hv_list = self.op.enabled_hypervisors
1565     else:
1566       self.hv_list = cluster.enabled_hypervisors
1567
1568     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1569       # either the enabled list has changed, or the parameters have, validate
1570       for hv_name, hv_params in self.new_hvparams.items():
1571         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1572             (self.op.enabled_hypervisors and
1573              hv_name in self.op.enabled_hypervisors)):
1574           # either this is a new hypervisor, or its parameters have changed
1575           hv_class = hypervisor.GetHypervisor(hv_name)
1576           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1577           hv_class.CheckParameterSyntax(hv_params)
1578           _CheckHVParams(self, node_list, hv_name, hv_params)
1579
1580   def Exec(self, feedback_fn):
1581     """Change the parameters of the cluster.
1582
1583     """
1584     if self.op.vg_name is not None:
1585       new_volume = self.op.vg_name
1586       if not new_volume:
1587         new_volume = None
1588       if new_volume != self.cfg.GetVGName():
1589         self.cfg.SetVGName(new_volume)
1590       else:
1591         feedback_fn("Cluster LVM configuration already in desired"
1592                     " state, not changing")
1593     if self.op.hvparams:
1594       self.cluster.hvparams = self.new_hvparams
1595     if self.op.enabled_hypervisors is not None:
1596       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1597     if self.op.beparams:
1598       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1599     if self.op.nicparams:
1600       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1601
1602     if self.op.candidate_pool_size is not None:
1603       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1604
1605     self.cfg.Update(self.cluster)
1606
1607     # we want to update nodes after the cluster so that if any errors
1608     # happen, we have recorded and saved the cluster info
1609     if self.op.candidate_pool_size is not None:
1610       _AdjustCandidatePool(self)
1611
1612
1613 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1614   """Distribute additional files which are part of the cluster configuration.
1615
1616   ConfigWriter takes care of distributing the config and ssconf files, but
1617   there are more files which should be distributed to all nodes. This function
1618   makes sure those are copied.
1619
1620   @param lu: calling logical unit
1621   @param additional_nodes: list of nodes not in the config to distribute to
1622
1623   """
1624   # 1. Gather target nodes
1625   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1626   dist_nodes = lu.cfg.GetNodeList()
1627   if additional_nodes is not None:
1628     dist_nodes.extend(additional_nodes)
1629   if myself.name in dist_nodes:
1630     dist_nodes.remove(myself.name)
1631   # 2. Gather files to distribute
1632   dist_files = set([constants.ETC_HOSTS,
1633                     constants.SSH_KNOWN_HOSTS_FILE,
1634                     constants.RAPI_CERT_FILE,
1635                     constants.RAPI_USERS_FILE,
1636                    ])
1637
1638   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1639   for hv_name in enabled_hypervisors:
1640     hv_class = hypervisor.GetHypervisor(hv_name)
1641     dist_files.update(hv_class.GetAncillaryFiles())
1642
1643   # 3. Perform the files upload
1644   for fname in dist_files:
1645     if os.path.exists(fname):
1646       result = lu.rpc.call_upload_file(dist_nodes, fname)
1647       for to_node, to_result in result.items():
1648         msg = to_result.fail_msg
1649         if msg:
1650           msg = ("Copy of file %s to node %s failed: %s" %
1651                  (fname, to_node, msg))
1652           lu.proc.LogWarning(msg)
1653
1654
1655 class LURedistributeConfig(NoHooksLU):
1656   """Force the redistribution of cluster configuration.
1657
1658   This is a very simple LU.
1659
1660   """
1661   _OP_REQP = []
1662   REQ_BGL = False
1663
1664   def ExpandNames(self):
1665     self.needed_locks = {
1666       locking.LEVEL_NODE: locking.ALL_SET,
1667     }
1668     self.share_locks[locking.LEVEL_NODE] = 1
1669
1670   def CheckPrereq(self):
1671     """Check prerequisites.
1672
1673     """
1674
1675   def Exec(self, feedback_fn):
1676     """Redistribute the configuration.
1677
1678     """
1679     self.cfg.Update(self.cfg.GetClusterInfo())
1680     _RedistributeAncillaryFiles(self)
1681
1682
1683 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1684   """Sleep and poll for an instance's disk to sync.
1685
1686   """
1687   if not instance.disks:
1688     return True
1689
1690   if not oneshot:
1691     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1692
1693   node = instance.primary_node
1694
1695   for dev in instance.disks:
1696     lu.cfg.SetDiskID(dev, node)
1697
1698   retries = 0
1699   degr_retries = 10 # in seconds, as we sleep 1 second each time
1700   while True:
1701     max_time = 0
1702     done = True
1703     cumul_degraded = False
1704     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1705     msg = rstats.fail_msg
1706     if msg:
1707       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1708       retries += 1
1709       if retries >= 10:
1710         raise errors.RemoteError("Can't contact node %s for mirror data,"
1711                                  " aborting." % node)
1712       time.sleep(6)
1713       continue
1714     rstats = rstats.payload
1715     retries = 0
1716     for i, mstat in enumerate(rstats):
1717       if mstat is None:
1718         lu.LogWarning("Can't compute data for node %s/%s",
1719                            node, instance.disks[i].iv_name)
1720         continue
1721       # we ignore the ldisk parameter
1722       perc_done, est_time, is_degraded, _ = mstat
1723       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1724       if perc_done is not None:
1725         done = False
1726         if est_time is not None:
1727           rem_time = "%d estimated seconds remaining" % est_time
1728           max_time = est_time
1729         else:
1730           rem_time = "no time estimate"
1731         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1732                         (instance.disks[i].iv_name, perc_done, rem_time))
1733
1734     # if we're done but degraded, let's do a few small retries, to
1735     # make sure we see a stable and not transient situation; therefore
1736     # we force restart of the loop
1737     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1738       logging.info("Degraded disks found, %d retries left", degr_retries)
1739       degr_retries -= 1
1740       time.sleep(1)
1741       continue
1742
1743     if done or oneshot:
1744       break
1745
1746     time.sleep(min(60, max_time))
1747
1748   if done:
1749     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1750   return not cumul_degraded
1751
1752
1753 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1754   """Check that mirrors are not degraded.
1755
1756   The ldisk parameter, if True, will change the test from the
1757   is_degraded attribute (which represents overall non-ok status for
1758   the device(s)) to the ldisk (representing the local storage status).
1759
1760   """
1761   lu.cfg.SetDiskID(dev, node)
1762   if ldisk:
1763     idx = 6
1764   else:
1765     idx = 5
1766
1767   result = True
1768   if on_primary or dev.AssembleOnSecondary():
1769     rstats = lu.rpc.call_blockdev_find(node, dev)
1770     msg = rstats.fail_msg
1771     if msg:
1772       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1773       result = False
1774     elif not rstats.payload:
1775       lu.LogWarning("Can't find disk on node %s", node)
1776       result = False
1777     else:
1778       result = result and (not rstats.payload[idx])
1779   if dev.children:
1780     for child in dev.children:
1781       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1782
1783   return result
1784
1785
1786 class LUDiagnoseOS(NoHooksLU):
1787   """Logical unit for OS diagnose/query.
1788
1789   """
1790   _OP_REQP = ["output_fields", "names"]
1791   REQ_BGL = False
1792   _FIELDS_STATIC = utils.FieldSet()
1793   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1794
1795   def ExpandNames(self):
1796     if self.op.names:
1797       raise errors.OpPrereqError("Selective OS query not supported")
1798
1799     _CheckOutputFields(static=self._FIELDS_STATIC,
1800                        dynamic=self._FIELDS_DYNAMIC,
1801                        selected=self.op.output_fields)
1802
1803     # Lock all nodes, in shared mode
1804     # Temporary removal of locks, should be reverted later
1805     # TODO: reintroduce locks when they are lighter-weight
1806     self.needed_locks = {}
1807     #self.share_locks[locking.LEVEL_NODE] = 1
1808     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1809
1810   def CheckPrereq(self):
1811     """Check prerequisites.
1812
1813     """
1814
1815   @staticmethod
1816   def _DiagnoseByOS(node_list, rlist):
1817     """Remaps a per-node return list into an a per-os per-node dictionary
1818
1819     @param node_list: a list with the names of all nodes
1820     @param rlist: a map with node names as keys and OS objects as values
1821
1822     @rtype: dict
1823     @return: a dictionary with osnames as keys and as value another map, with
1824         nodes as keys and tuples of (path, status, diagnose) as values, eg::
1825
1826           {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1827                                      (/srv/..., False, "invalid api")],
1828                            "node2": [(/srv/..., True, "")]}
1829           }
1830
1831     """
1832     all_os = {}
1833     # we build here the list of nodes that didn't fail the RPC (at RPC
1834     # level), so that nodes with a non-responding node daemon don't
1835     # make all OSes invalid
1836     good_nodes = [node_name for node_name in rlist
1837                   if not rlist[node_name].fail_msg]
1838     for node_name, nr in rlist.items():
1839       if nr.fail_msg or not nr.payload:
1840         continue
1841       for name, path, status, diagnose in nr.payload:
1842         if name not in all_os:
1843           # build a list of nodes for this os containing empty lists
1844           # for each node in node_list
1845           all_os[name] = {}
1846           for nname in good_nodes:
1847             all_os[name][nname] = []
1848         all_os[name][node_name].append((path, status, diagnose))
1849     return all_os
1850
1851   def Exec(self, feedback_fn):
1852     """Compute the list of OSes.
1853
1854     """
1855     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1856     node_data = self.rpc.call_os_diagnose(valid_nodes)
1857     pol = self._DiagnoseByOS(valid_nodes, node_data)
1858     output = []
1859     for os_name, os_data in pol.items():
1860       row = []
1861       for field in self.op.output_fields:
1862         if field == "name":
1863           val = os_name
1864         elif field == "valid":
1865           val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1866         elif field == "node_status":
1867           # this is just a copy of the dict
1868           val = {}
1869           for node_name, nos_list in os_data.items():
1870             val[node_name] = nos_list
1871         else:
1872           raise errors.ParameterError(field)
1873         row.append(val)
1874       output.append(row)
1875
1876     return output
1877
1878
1879 class LURemoveNode(LogicalUnit):
1880   """Logical unit for removing a node.
1881
1882   """
1883   HPATH = "node-remove"
1884   HTYPE = constants.HTYPE_NODE
1885   _OP_REQP = ["node_name"]
1886
1887   def BuildHooksEnv(self):
1888     """Build hooks env.
1889
1890     This doesn't run on the target node in the pre phase as a failed
1891     node would then be impossible to remove.
1892
1893     """
1894     env = {
1895       "OP_TARGET": self.op.node_name,
1896       "NODE_NAME": self.op.node_name,
1897       }
1898     all_nodes = self.cfg.GetNodeList()
1899     all_nodes.remove(self.op.node_name)
1900     return env, all_nodes, all_nodes
1901
1902   def CheckPrereq(self):
1903     """Check prerequisites.
1904
1905     This checks:
1906      - the node exists in the configuration
1907      - it does not have primary or secondary instances
1908      - it's not the master
1909
1910     Any errors are signalled by raising errors.OpPrereqError.
1911
1912     """
1913     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1914     if node is None:
1915       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1916
1917     instance_list = self.cfg.GetInstanceList()
1918
1919     masternode = self.cfg.GetMasterNode()
1920     if node.name == masternode:
1921       raise errors.OpPrereqError("Node is the master node,"
1922                                  " you need to failover first.")
1923
1924     for instance_name in instance_list:
1925       instance = self.cfg.GetInstanceInfo(instance_name)
1926       if node.name in instance.all_nodes:
1927         raise errors.OpPrereqError("Instance %s is still running on the node,"
1928                                    " please remove first." % instance_name)
1929     self.op.node_name = node.name
1930     self.node = node
1931
1932   def Exec(self, feedback_fn):
1933     """Removes the node from the cluster.
1934
1935     """
1936     node = self.node
1937     logging.info("Stopping the node daemon and removing configs from node %s",
1938                  node.name)
1939
1940     self.context.RemoveNode(node.name)
1941
1942     result = self.rpc.call_node_leave_cluster(node.name)
1943     msg = result.fail_msg
1944     if msg:
1945       self.LogWarning("Errors encountered on the remote node while leaving"
1946                       " the cluster: %s", msg)
1947
1948     # Promote nodes to master candidate as needed
1949     _AdjustCandidatePool(self)
1950
1951
1952 class LUQueryNodes(NoHooksLU):
1953   """Logical unit for querying nodes.
1954
1955   """
1956   _OP_REQP = ["output_fields", "names", "use_locking"]
1957   REQ_BGL = False
1958   _FIELDS_DYNAMIC = utils.FieldSet(
1959     "dtotal", "dfree",
1960     "mtotal", "mnode", "mfree",
1961     "bootid",
1962     "ctotal", "cnodes", "csockets",
1963     )
1964
1965   _FIELDS_STATIC = utils.FieldSet(
1966     "name", "pinst_cnt", "sinst_cnt",
1967     "pinst_list", "sinst_list",
1968     "pip", "sip", "tags",
1969     "serial_no",
1970     "master_candidate",
1971     "master",
1972     "offline",
1973     "drained",
1974     )
1975
1976   def ExpandNames(self):
1977     _CheckOutputFields(static=self._FIELDS_STATIC,
1978                        dynamic=self._FIELDS_DYNAMIC,
1979                        selected=self.op.output_fields)
1980
1981     self.needed_locks = {}
1982     self.share_locks[locking.LEVEL_NODE] = 1
1983
1984     if self.op.names:
1985       self.wanted = _GetWantedNodes(self, self.op.names)
1986     else:
1987       self.wanted = locking.ALL_SET
1988
1989     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1990     self.do_locking = self.do_node_query and self.op.use_locking
1991     if self.do_locking:
1992       # if we don't request only static fields, we need to lock the nodes
1993       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1994
1995
1996   def CheckPrereq(self):
1997     """Check prerequisites.
1998
1999     """
2000     # The validation of the node list is done in the _GetWantedNodes,
2001     # if non empty, and if empty, there's no validation to do
2002     pass
2003
2004   def Exec(self, feedback_fn):
2005     """Computes the list of nodes and their attributes.
2006
2007     """
2008     all_info = self.cfg.GetAllNodesInfo()
2009     if self.do_locking:
2010       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2011     elif self.wanted != locking.ALL_SET:
2012       nodenames = self.wanted
2013       missing = set(nodenames).difference(all_info.keys())
2014       if missing:
2015         raise errors.OpExecError(
2016           "Some nodes were removed before retrieving their data: %s" % missing)
2017     else:
2018       nodenames = all_info.keys()
2019
2020     nodenames = utils.NiceSort(nodenames)
2021     nodelist = [all_info[name] for name in nodenames]
2022
2023     # begin data gathering
2024
2025     if self.do_node_query:
2026       live_data = {}
2027       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2028                                           self.cfg.GetHypervisorType())
2029       for name in nodenames:
2030         nodeinfo = node_data[name]
2031         if not nodeinfo.fail_msg and nodeinfo.payload:
2032           nodeinfo = nodeinfo.payload
2033           fn = utils.TryConvert
2034           live_data[name] = {
2035             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2036             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2037             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2038             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2039             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2040             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2041             "bootid": nodeinfo.get('bootid', None),
2042             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2043             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2044             }
2045         else:
2046           live_data[name] = {}
2047     else:
2048       live_data = dict.fromkeys(nodenames, {})
2049
2050     node_to_primary = dict([(name, set()) for name in nodenames])
2051     node_to_secondary = dict([(name, set()) for name in nodenames])
2052
2053     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2054                              "sinst_cnt", "sinst_list"))
2055     if inst_fields & frozenset(self.op.output_fields):
2056       instancelist = self.cfg.GetInstanceList()
2057
2058       for instance_name in instancelist:
2059         inst = self.cfg.GetInstanceInfo(instance_name)
2060         if inst.primary_node in node_to_primary:
2061           node_to_primary[inst.primary_node].add(inst.name)
2062         for secnode in inst.secondary_nodes:
2063           if secnode in node_to_secondary:
2064             node_to_secondary[secnode].add(inst.name)
2065
2066     master_node = self.cfg.GetMasterNode()
2067
2068     # end data gathering
2069
2070     output = []
2071     for node in nodelist:
2072       node_output = []
2073       for field in self.op.output_fields:
2074         if field == "name":
2075           val = node.name
2076         elif field == "pinst_list":
2077           val = list(node_to_primary[node.name])
2078         elif field == "sinst_list":
2079           val = list(node_to_secondary[node.name])
2080         elif field == "pinst_cnt":
2081           val = len(node_to_primary[node.name])
2082         elif field == "sinst_cnt":
2083           val = len(node_to_secondary[node.name])
2084         elif field == "pip":
2085           val = node.primary_ip
2086         elif field == "sip":
2087           val = node.secondary_ip
2088         elif field == "tags":
2089           val = list(node.GetTags())
2090         elif field == "serial_no":
2091           val = node.serial_no
2092         elif field == "master_candidate":
2093           val = node.master_candidate
2094         elif field == "master":
2095           val = node.name == master_node
2096         elif field == "offline":
2097           val = node.offline
2098         elif field == "drained":
2099           val = node.drained
2100         elif self._FIELDS_DYNAMIC.Matches(field):
2101           val = live_data[node.name].get(field, None)
2102         else:
2103           raise errors.ParameterError(field)
2104         node_output.append(val)
2105       output.append(node_output)
2106
2107     return output
2108
2109
2110 class LUQueryNodeVolumes(NoHooksLU):
2111   """Logical unit for getting volumes on node(s).
2112
2113   """
2114   _OP_REQP = ["nodes", "output_fields"]
2115   REQ_BGL = False
2116   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2117   _FIELDS_STATIC = utils.FieldSet("node")
2118
2119   def ExpandNames(self):
2120     _CheckOutputFields(static=self._FIELDS_STATIC,
2121                        dynamic=self._FIELDS_DYNAMIC,
2122                        selected=self.op.output_fields)
2123
2124     self.needed_locks = {}
2125     self.share_locks[locking.LEVEL_NODE] = 1
2126     if not self.op.nodes:
2127       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2128     else:
2129       self.needed_locks[locking.LEVEL_NODE] = \
2130         _GetWantedNodes(self, self.op.nodes)
2131
2132   def CheckPrereq(self):
2133     """Check prerequisites.
2134
2135     This checks that the fields required are valid output fields.
2136
2137     """
2138     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2139
2140   def Exec(self, feedback_fn):
2141     """Computes the list of nodes and their attributes.
2142
2143     """
2144     nodenames = self.nodes
2145     volumes = self.rpc.call_node_volumes(nodenames)
2146
2147     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2148              in self.cfg.GetInstanceList()]
2149
2150     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2151
2152     output = []
2153     for node in nodenames:
2154       nresult = volumes[node]
2155       if nresult.offline:
2156         continue
2157       msg = nresult.fail_msg
2158       if msg:
2159         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2160         continue
2161
2162       node_vols = nresult.payload[:]
2163       node_vols.sort(key=lambda vol: vol['dev'])
2164
2165       for vol in node_vols:
2166         node_output = []
2167         for field in self.op.output_fields:
2168           if field == "node":
2169             val = node
2170           elif field == "phys":
2171             val = vol['dev']
2172           elif field == "vg":
2173             val = vol['vg']
2174           elif field == "name":
2175             val = vol['name']
2176           elif field == "size":
2177             val = int(float(vol['size']))
2178           elif field == "instance":
2179             for inst in ilist:
2180               if node not in lv_by_node[inst]:
2181                 continue
2182               if vol['name'] in lv_by_node[inst][node]:
2183                 val = inst.name
2184                 break
2185             else:
2186               val = '-'
2187           else:
2188             raise errors.ParameterError(field)
2189           node_output.append(str(val))
2190
2191         output.append(node_output)
2192
2193     return output
2194
2195
2196 class LUAddNode(LogicalUnit):
2197   """Logical unit for adding node to the cluster.
2198
2199   """
2200   HPATH = "node-add"
2201   HTYPE = constants.HTYPE_NODE
2202   _OP_REQP = ["node_name"]
2203
2204   def BuildHooksEnv(self):
2205     """Build hooks env.
2206
2207     This will run on all nodes before, and on all nodes + the new node after.
2208
2209     """
2210     env = {
2211       "OP_TARGET": self.op.node_name,
2212       "NODE_NAME": self.op.node_name,
2213       "NODE_PIP": self.op.primary_ip,
2214       "NODE_SIP": self.op.secondary_ip,
2215       }
2216     nodes_0 = self.cfg.GetNodeList()
2217     nodes_1 = nodes_0 + [self.op.node_name, ]
2218     return env, nodes_0, nodes_1
2219
2220   def CheckPrereq(self):
2221     """Check prerequisites.
2222
2223     This checks:
2224      - the new node is not already in the config
2225      - it is resolvable
2226      - its parameters (single/dual homed) matches the cluster
2227
2228     Any errors are signalled by raising errors.OpPrereqError.
2229
2230     """
2231     node_name = self.op.node_name
2232     cfg = self.cfg
2233
2234     dns_data = utils.HostInfo(node_name)
2235
2236     node = dns_data.name
2237     primary_ip = self.op.primary_ip = dns_data.ip
2238     secondary_ip = getattr(self.op, "secondary_ip", None)
2239     if secondary_ip is None:
2240       secondary_ip = primary_ip
2241     if not utils.IsValidIP(secondary_ip):
2242       raise errors.OpPrereqError("Invalid secondary IP given")
2243     self.op.secondary_ip = secondary_ip
2244
2245     node_list = cfg.GetNodeList()
2246     if not self.op.readd and node in node_list:
2247       raise errors.OpPrereqError("Node %s is already in the configuration" %
2248                                  node)
2249     elif self.op.readd and node not in node_list:
2250       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2251
2252     for existing_node_name in node_list:
2253       existing_node = cfg.GetNodeInfo(existing_node_name)
2254
2255       if self.op.readd and node == existing_node_name:
2256         if (existing_node.primary_ip != primary_ip or
2257             existing_node.secondary_ip != secondary_ip):
2258           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2259                                      " address configuration as before")
2260         continue
2261
2262       if (existing_node.primary_ip == primary_ip or
2263           existing_node.secondary_ip == primary_ip or
2264           existing_node.primary_ip == secondary_ip or
2265           existing_node.secondary_ip == secondary_ip):
2266         raise errors.OpPrereqError("New node ip address(es) conflict with"
2267                                    " existing node %s" % existing_node.name)
2268
2269     # check that the type of the node (single versus dual homed) is the
2270     # same as for the master
2271     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2272     master_singlehomed = myself.secondary_ip == myself.primary_ip
2273     newbie_singlehomed = secondary_ip == primary_ip
2274     if master_singlehomed != newbie_singlehomed:
2275       if master_singlehomed:
2276         raise errors.OpPrereqError("The master has no private ip but the"
2277                                    " new node has one")
2278       else:
2279         raise errors.OpPrereqError("The master has a private ip but the"
2280                                    " new node doesn't have one")
2281
2282     # checks reachablity
2283     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2284       raise errors.OpPrereqError("Node not reachable by ping")
2285
2286     if not newbie_singlehomed:
2287       # check reachability from my secondary ip to newbie's secondary ip
2288       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2289                            source=myself.secondary_ip):
2290         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2291                                    " based ping to noded port")
2292
2293     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2294     mc_now, _ = self.cfg.GetMasterCandidateStats()
2295     master_candidate = mc_now < cp_size
2296
2297     self.new_node = objects.Node(name=node,
2298                                  primary_ip=primary_ip,
2299                                  secondary_ip=secondary_ip,
2300                                  master_candidate=master_candidate,
2301                                  offline=False, drained=False)
2302
2303   def Exec(self, feedback_fn):
2304     """Adds the new node to the cluster.
2305
2306     """
2307     new_node = self.new_node
2308     node = new_node.name
2309
2310     # check connectivity
2311     result = self.rpc.call_version([node])[node]
2312     result.Raise("Can't get version information from node %s" % node)
2313     if constants.PROTOCOL_VERSION == result.payload:
2314       logging.info("Communication to node %s fine, sw version %s match",
2315                    node, result.payload)
2316     else:
2317       raise errors.OpExecError("Version mismatch master version %s,"
2318                                " node version %s" %
2319                                (constants.PROTOCOL_VERSION, result.payload))
2320
2321     # setup ssh on node
2322     logging.info("Copy ssh key to node %s", node)
2323     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2324     keyarray = []
2325     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2326                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2327                 priv_key, pub_key]
2328
2329     for i in keyfiles:
2330       f = open(i, 'r')
2331       try:
2332         keyarray.append(f.read())
2333       finally:
2334         f.close()
2335
2336     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2337                                     keyarray[2],
2338                                     keyarray[3], keyarray[4], keyarray[5])
2339     result.Raise("Cannot transfer ssh keys to the new node")
2340
2341     # Add node to our /etc/hosts, and add key to known_hosts
2342     if self.cfg.GetClusterInfo().modify_etc_hosts:
2343       utils.AddHostToEtcHosts(new_node.name)
2344
2345     if new_node.secondary_ip != new_node.primary_ip:
2346       result = self.rpc.call_node_has_ip_address(new_node.name,
2347                                                  new_node.secondary_ip)
2348       result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2349                    prereq=True)
2350       if not result.payload:
2351         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2352                                  " you gave (%s). Please fix and re-run this"
2353                                  " command." % new_node.secondary_ip)
2354
2355     node_verify_list = [self.cfg.GetMasterNode()]
2356     node_verify_param = {
2357       'nodelist': [node],
2358       # TODO: do a node-net-test as well?
2359     }
2360
2361     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2362                                        self.cfg.GetClusterName())
2363     for verifier in node_verify_list:
2364       result[verifier].Raise("Cannot communicate with node %s" % verifier)
2365       nl_payload = result[verifier].payload['nodelist']
2366       if nl_payload:
2367         for failed in nl_payload:
2368           feedback_fn("ssh/hostname verification failed %s -> %s" %
2369                       (verifier, nl_payload[failed]))
2370         raise errors.OpExecError("ssh/hostname verification failed.")
2371
2372     if self.op.readd:
2373       _RedistributeAncillaryFiles(self)
2374       self.context.ReaddNode(new_node)
2375     else:
2376       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2377       self.context.AddNode(new_node)
2378
2379
2380 class LUSetNodeParams(LogicalUnit):
2381   """Modifies the parameters of a node.
2382
2383   """
2384   HPATH = "node-modify"
2385   HTYPE = constants.HTYPE_NODE
2386   _OP_REQP = ["node_name"]
2387   REQ_BGL = False
2388
2389   def CheckArguments(self):
2390     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2391     if node_name is None:
2392       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2393     self.op.node_name = node_name
2394     _CheckBooleanOpField(self.op, 'master_candidate')
2395     _CheckBooleanOpField(self.op, 'offline')
2396     _CheckBooleanOpField(self.op, 'drained')
2397     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2398     if all_mods.count(None) == 3:
2399       raise errors.OpPrereqError("Please pass at least one modification")
2400     if all_mods.count(True) > 1:
2401       raise errors.OpPrereqError("Can't set the node into more than one"
2402                                  " state at the same time")
2403
2404   def ExpandNames(self):
2405     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2406
2407   def BuildHooksEnv(self):
2408     """Build hooks env.
2409
2410     This runs on the master node.
2411
2412     """
2413     env = {
2414       "OP_TARGET": self.op.node_name,
2415       "MASTER_CANDIDATE": str(self.op.master_candidate),
2416       "OFFLINE": str(self.op.offline),
2417       "DRAINED": str(self.op.drained),
2418       }
2419     nl = [self.cfg.GetMasterNode(),
2420           self.op.node_name]
2421     return env, nl, nl
2422
2423   def CheckPrereq(self):
2424     """Check prerequisites.
2425
2426     This only checks the instance list against the existing names.
2427
2428     """
2429     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2430
2431     if ((self.op.master_candidate == False or self.op.offline == True or
2432          self.op.drained == True) and node.master_candidate):
2433       # we will demote the node from master_candidate
2434       if self.op.node_name == self.cfg.GetMasterNode():
2435         raise errors.OpPrereqError("The master node has to be a"
2436                                    " master candidate, online and not drained")
2437       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2438       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2439       if num_candidates <= cp_size:
2440         msg = ("Not enough master candidates (desired"
2441                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2442         if self.op.force:
2443           self.LogWarning(msg)
2444         else:
2445           raise errors.OpPrereqError(msg)
2446
2447     if (self.op.master_candidate == True and
2448         ((node.offline and not self.op.offline == False) or
2449          (node.drained and not self.op.drained == False))):
2450       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2451                                  " to master_candidate" % node.name)
2452
2453     return
2454
2455   def Exec(self, feedback_fn):
2456     """Modifies a node.
2457
2458     """
2459     node = self.node
2460
2461     result = []
2462     changed_mc = False
2463
2464     if self.op.offline is not None:
2465       node.offline = self.op.offline
2466       result.append(("offline", str(self.op.offline)))
2467       if self.op.offline == True:
2468         if node.master_candidate:
2469           node.master_candidate = False
2470           changed_mc = True
2471           result.append(("master_candidate", "auto-demotion due to offline"))
2472         if node.drained:
2473           node.drained = False
2474           result.append(("drained", "clear drained status due to offline"))
2475
2476     if self.op.master_candidate is not None:
2477       node.master_candidate = self.op.master_candidate
2478       changed_mc = True
2479       result.append(("master_candidate", str(self.op.master_candidate)))
2480       if self.op.master_candidate == False:
2481         rrc = self.rpc.call_node_demote_from_mc(node.name)
2482         msg = rrc.fail_msg
2483         if msg:
2484           self.LogWarning("Node failed to demote itself: %s" % msg)
2485
2486     if self.op.drained is not None:
2487       node.drained = self.op.drained
2488       result.append(("drained", str(self.op.drained)))
2489       if self.op.drained == True:
2490         if node.master_candidate:
2491           node.master_candidate = False
2492           changed_mc = True
2493           result.append(("master_candidate", "auto-demotion due to drain"))
2494         if node.offline:
2495           node.offline = False
2496           result.append(("offline", "clear offline status due to drain"))
2497
2498     # this will trigger configuration file update, if needed
2499     self.cfg.Update(node)
2500     # this will trigger job queue propagation or cleanup
2501     if changed_mc:
2502       self.context.ReaddNode(node)
2503
2504     return result
2505
2506
2507 class LUPowercycleNode(NoHooksLU):
2508   """Powercycles a node.
2509
2510   """
2511   _OP_REQP = ["node_name", "force"]
2512   REQ_BGL = False
2513
2514   def CheckArguments(self):
2515     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2516     if node_name is None:
2517       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2518     self.op.node_name = node_name
2519     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2520       raise errors.OpPrereqError("The node is the master and the force"
2521                                  " parameter was not set")
2522
2523   def ExpandNames(self):
2524     """Locking for PowercycleNode.
2525
2526     This is a last-resource option and shouldn't block on other
2527     jobs. Therefore, we grab no locks.
2528
2529     """
2530     self.needed_locks = {}
2531
2532   def CheckPrereq(self):
2533     """Check prerequisites.
2534
2535     This LU has no prereqs.
2536
2537     """
2538     pass
2539
2540   def Exec(self, feedback_fn):
2541     """Reboots a node.
2542
2543     """
2544     result = self.rpc.call_node_powercycle(self.op.node_name,
2545                                            self.cfg.GetHypervisorType())
2546     result.Raise("Failed to schedule the reboot")
2547     return result.payload
2548
2549
2550 class LUQueryClusterInfo(NoHooksLU):
2551   """Query cluster configuration.
2552
2553   """
2554   _OP_REQP = []
2555   REQ_BGL = False
2556
2557   def ExpandNames(self):
2558     self.needed_locks = {}
2559
2560   def CheckPrereq(self):
2561     """No prerequsites needed for this LU.
2562
2563     """
2564     pass
2565
2566   def Exec(self, feedback_fn):
2567     """Return cluster config.
2568
2569     """
2570     cluster = self.cfg.GetClusterInfo()
2571     result = {
2572       "software_version": constants.RELEASE_VERSION,
2573       "protocol_version": constants.PROTOCOL_VERSION,
2574       "config_version": constants.CONFIG_VERSION,
2575       "os_api_version": constants.OS_API_VERSION,
2576       "export_version": constants.EXPORT_VERSION,
2577       "architecture": (platform.architecture()[0], platform.machine()),
2578       "name": cluster.cluster_name,
2579       "master": cluster.master_node,
2580       "default_hypervisor": cluster.default_hypervisor,
2581       "enabled_hypervisors": cluster.enabled_hypervisors,
2582       "hvparams": dict([(hvname, cluster.hvparams[hvname])
2583                         for hvname in cluster.enabled_hypervisors]),
2584       "beparams": cluster.beparams,
2585       "nicparams": cluster.nicparams,
2586       "candidate_pool_size": cluster.candidate_pool_size,
2587       "master_netdev": cluster.master_netdev,
2588       "volume_group_name": cluster.volume_group_name,
2589       "file_storage_dir": cluster.file_storage_dir,
2590       }
2591
2592     return result
2593
2594
2595 class LUQueryConfigValues(NoHooksLU):
2596   """Return configuration values.
2597
2598   """
2599   _OP_REQP = []
2600   REQ_BGL = False
2601   _FIELDS_DYNAMIC = utils.FieldSet()
2602   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2603
2604   def ExpandNames(self):
2605     self.needed_locks = {}
2606
2607     _CheckOutputFields(static=self._FIELDS_STATIC,
2608                        dynamic=self._FIELDS_DYNAMIC,
2609                        selected=self.op.output_fields)
2610
2611   def CheckPrereq(self):
2612     """No prerequisites.
2613
2614     """
2615     pass
2616
2617   def Exec(self, feedback_fn):
2618     """Dump a representation of the cluster config to the standard output.
2619
2620     """
2621     values = []
2622     for field in self.op.output_fields:
2623       if field == "cluster_name":
2624         entry = self.cfg.GetClusterName()
2625       elif field == "master_node":
2626         entry = self.cfg.GetMasterNode()
2627       elif field == "drain_flag":
2628         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2629       else:
2630         raise errors.ParameterError(field)
2631       values.append(entry)
2632     return values
2633
2634
2635 class LUActivateInstanceDisks(NoHooksLU):
2636   """Bring up an instance's disks.
2637
2638   """
2639   _OP_REQP = ["instance_name"]
2640   REQ_BGL = False
2641
2642   def ExpandNames(self):
2643     self._ExpandAndLockInstance()
2644     self.needed_locks[locking.LEVEL_NODE] = []
2645     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2646
2647   def DeclareLocks(self, level):
2648     if level == locking.LEVEL_NODE:
2649       self._LockInstancesNodes()
2650
2651   def CheckPrereq(self):
2652     """Check prerequisites.
2653
2654     This checks that the instance is in the cluster.
2655
2656     """
2657     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2658     assert self.instance is not None, \
2659       "Cannot retrieve locked instance %s" % self.op.instance_name
2660     _CheckNodeOnline(self, self.instance.primary_node)
2661
2662   def Exec(self, feedback_fn):
2663     """Activate the disks.
2664
2665     """
2666     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2667     if not disks_ok:
2668       raise errors.OpExecError("Cannot activate block devices")
2669
2670     return disks_info
2671
2672
2673 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2674   """Prepare the block devices for an instance.
2675
2676   This sets up the block devices on all nodes.
2677
2678   @type lu: L{LogicalUnit}
2679   @param lu: the logical unit on whose behalf we execute
2680   @type instance: L{objects.Instance}
2681   @param instance: the instance for whose disks we assemble
2682   @type ignore_secondaries: boolean
2683   @param ignore_secondaries: if true, errors on secondary nodes
2684       won't result in an error return from the function
2685   @return: False if the operation failed, otherwise a list of
2686       (host, instance_visible_name, node_visible_name)
2687       with the mapping from node devices to instance devices
2688
2689   """
2690   device_info = []
2691   disks_ok = True
2692   iname = instance.name
2693   # With the two passes mechanism we try to reduce the window of
2694   # opportunity for the race condition of switching DRBD to primary
2695   # before handshaking occured, but we do not eliminate it
2696
2697   # The proper fix would be to wait (with some limits) until the
2698   # connection has been made and drbd transitions from WFConnection
2699   # into any other network-connected state (Connected, SyncTarget,
2700   # SyncSource, etc.)
2701
2702   # 1st pass, assemble on all nodes in secondary mode
2703   for inst_disk in instance.disks:
2704     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2705       lu.cfg.SetDiskID(node_disk, node)
2706       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2707       msg = result.fail_msg
2708       if msg:
2709         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2710                            " (is_primary=False, pass=1): %s",
2711                            inst_disk.iv_name, node, msg)
2712         if not ignore_secondaries:
2713           disks_ok = False
2714
2715   # FIXME: race condition on drbd migration to primary
2716
2717   # 2nd pass, do only the primary node
2718   for inst_disk in instance.disks:
2719     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2720       if node != instance.primary_node:
2721         continue
2722       lu.cfg.SetDiskID(node_disk, node)
2723       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2724       msg = result.fail_msg
2725       if msg:
2726         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2727                            " (is_primary=True, pass=2): %s",
2728                            inst_disk.iv_name, node, msg)
2729         disks_ok = False
2730     device_info.append((instance.primary_node, inst_disk.iv_name,
2731                         result.payload))
2732
2733   # leave the disks configured for the primary node
2734   # this is a workaround that would be fixed better by
2735   # improving the logical/physical id handling
2736   for disk in instance.disks:
2737     lu.cfg.SetDiskID(disk, instance.primary_node)
2738
2739   return disks_ok, device_info
2740
2741
2742 def _StartInstanceDisks(lu, instance, force):
2743   """Start the disks of an instance.
2744
2745   """
2746   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2747                                            ignore_secondaries=force)
2748   if not disks_ok:
2749     _ShutdownInstanceDisks(lu, instance)
2750     if force is not None and not force:
2751       lu.proc.LogWarning("", hint="If the message above refers to a"
2752                          " secondary node,"
2753                          " you can retry the operation using '--force'.")
2754     raise errors.OpExecError("Disk consistency error")
2755
2756
2757 class LUDeactivateInstanceDisks(NoHooksLU):
2758   """Shutdown an instance's disks.
2759
2760   """
2761   _OP_REQP = ["instance_name"]
2762   REQ_BGL = False
2763
2764   def ExpandNames(self):
2765     self._ExpandAndLockInstance()
2766     self.needed_locks[locking.LEVEL_NODE] = []
2767     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2768
2769   def DeclareLocks(self, level):
2770     if level == locking.LEVEL_NODE:
2771       self._LockInstancesNodes()
2772
2773   def CheckPrereq(self):
2774     """Check prerequisites.
2775
2776     This checks that the instance is in the cluster.
2777
2778     """
2779     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2780     assert self.instance is not None, \
2781       "Cannot retrieve locked instance %s" % self.op.instance_name
2782
2783   def Exec(self, feedback_fn):
2784     """Deactivate the disks
2785
2786     """
2787     instance = self.instance
2788     _SafeShutdownInstanceDisks(self, instance)
2789
2790
2791 def _SafeShutdownInstanceDisks(lu, instance):
2792   """Shutdown block devices of an instance.
2793
2794   This function checks if an instance is running, before calling
2795   _ShutdownInstanceDisks.
2796
2797   """
2798   pnode = instance.primary_node
2799   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2800   ins_l.Raise("Can't contact node %s" % pnode)
2801
2802   if instance.name in ins_l.payload:
2803     raise errors.OpExecError("Instance is running, can't shutdown"
2804                              " block devices.")
2805
2806   _ShutdownInstanceDisks(lu, instance)
2807
2808
2809 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2810   """Shutdown block devices of an instance.
2811
2812   This does the shutdown on all nodes of the instance.
2813
2814   If the ignore_primary is false, errors on the primary node are
2815   ignored.
2816
2817   """
2818   all_result = True
2819   for disk in instance.disks:
2820     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2821       lu.cfg.SetDiskID(top_disk, node)
2822       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2823       msg = result.fail_msg
2824       if msg:
2825         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2826                       disk.iv_name, node, msg)
2827         if not ignore_primary or node != instance.primary_node:
2828           all_result = False
2829   return all_result
2830
2831
2832 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2833   """Checks if a node has enough free memory.
2834
2835   This function check if a given node has the needed amount of free
2836   memory. In case the node has less memory or we cannot get the
2837   information from the node, this function raise an OpPrereqError
2838   exception.
2839
2840   @type lu: C{LogicalUnit}
2841   @param lu: a logical unit from which we get configuration data
2842   @type node: C{str}
2843   @param node: the node to check
2844   @type reason: C{str}
2845   @param reason: string to use in the error message
2846   @type requested: C{int}
2847   @param requested: the amount of memory in MiB to check for
2848   @type hypervisor_name: C{str}
2849   @param hypervisor_name: the hypervisor to ask for memory stats
2850   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2851       we cannot check the node
2852
2853   """
2854   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2855   nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2856   free_mem = nodeinfo[node].payload.get('memory_free', None)
2857   if not isinstance(free_mem, int):
2858     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2859                                " was '%s'" % (node, free_mem))
2860   if requested > free_mem:
2861     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2862                                " needed %s MiB, available %s MiB" %
2863                                (node, reason, requested, free_mem))
2864
2865
2866 class LUStartupInstance(LogicalUnit):
2867   """Starts an instance.
2868
2869   """
2870   HPATH = "instance-start"
2871   HTYPE = constants.HTYPE_INSTANCE
2872   _OP_REQP = ["instance_name", "force"]
2873   REQ_BGL = False
2874
2875   def ExpandNames(self):
2876     self._ExpandAndLockInstance()
2877
2878   def BuildHooksEnv(self):
2879     """Build hooks env.
2880
2881     This runs on master, primary and secondary nodes of the instance.
2882
2883     """
2884     env = {
2885       "FORCE": self.op.force,
2886       }
2887     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2888     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2889     return env, nl, nl
2890
2891   def CheckPrereq(self):
2892     """Check prerequisites.
2893
2894     This checks that the instance is in the cluster.
2895
2896     """
2897     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2898     assert self.instance is not None, \
2899       "Cannot retrieve locked instance %s" % self.op.instance_name
2900
2901     # extra beparams
2902     self.beparams = getattr(self.op, "beparams", {})
2903     if self.beparams:
2904       if not isinstance(self.beparams, dict):
2905         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2906                                    " dict" % (type(self.beparams), ))
2907       # fill the beparams dict
2908       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2909       self.op.beparams = self.beparams
2910
2911     # extra hvparams
2912     self.hvparams = getattr(self.op, "hvparams", {})
2913     if self.hvparams:
2914       if not isinstance(self.hvparams, dict):
2915         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2916                                    " dict" % (type(self.hvparams), ))
2917
2918       # check hypervisor parameter syntax (locally)
2919       cluster = self.cfg.GetClusterInfo()
2920       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2921       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2922                                     instance.hvparams)
2923       filled_hvp.update(self.hvparams)
2924       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2925       hv_type.CheckParameterSyntax(filled_hvp)
2926       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2927       self.op.hvparams = self.hvparams
2928
2929     _CheckNodeOnline(self, instance.primary_node)
2930
2931     bep = self.cfg.GetClusterInfo().FillBE(instance)
2932     # check bridges existance
2933     _CheckInstanceBridgesExist(self, instance)
2934
2935     remote_info = self.rpc.call_instance_info(instance.primary_node,
2936                                               instance.name,
2937                                               instance.hypervisor)
2938     remote_info.Raise("Error checking node %s" % instance.primary_node,
2939                       prereq=True)
2940     if not remote_info.payload: # not running already
2941       _CheckNodeFreeMemory(self, instance.primary_node,
2942                            "starting instance %s" % instance.name,
2943                            bep[constants.BE_MEMORY], instance.hypervisor)
2944
2945   def Exec(self, feedback_fn):
2946     """Start the instance.
2947
2948     """
2949     instance = self.instance
2950     force = self.op.force
2951
2952     self.cfg.MarkInstanceUp(instance.name)
2953
2954     node_current = instance.primary_node
2955
2956     _StartInstanceDisks(self, instance, force)
2957
2958     result = self.rpc.call_instance_start(node_current, instance,
2959                                           self.hvparams, self.beparams)
2960     msg = result.fail_msg
2961     if msg:
2962       _ShutdownInstanceDisks(self, instance)
2963       raise errors.OpExecError("Could not start instance: %s" % msg)
2964
2965
2966 class LURebootInstance(LogicalUnit):
2967   """Reboot an instance.
2968
2969   """
2970   HPATH = "instance-reboot"
2971   HTYPE = constants.HTYPE_INSTANCE
2972   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2973   REQ_BGL = False
2974
2975   def ExpandNames(self):
2976     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2977                                    constants.INSTANCE_REBOOT_HARD,
2978                                    constants.INSTANCE_REBOOT_FULL]:
2979       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2980                                   (constants.INSTANCE_REBOOT_SOFT,
2981                                    constants.INSTANCE_REBOOT_HARD,
2982                                    constants.INSTANCE_REBOOT_FULL))
2983     self._ExpandAndLockInstance()
2984
2985   def BuildHooksEnv(self):
2986     """Build hooks env.
2987
2988     This runs on master, primary and secondary nodes of the instance.
2989
2990     """
2991     env = {
2992       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2993       "REBOOT_TYPE": self.op.reboot_type,
2994       }
2995     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2996     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2997     return env, nl, nl
2998
2999   def CheckPrereq(self):
3000     """Check prerequisites.
3001
3002     This checks that the instance is in the cluster.
3003
3004     """
3005     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3006     assert self.instance is not None, \
3007       "Cannot retrieve locked instance %s" % self.op.instance_name
3008
3009     _CheckNodeOnline(self, instance.primary_node)
3010
3011     # check bridges existance
3012     _CheckInstanceBridgesExist(self, instance)
3013
3014   def Exec(self, feedback_fn):
3015     """Reboot the instance.
3016
3017     """
3018     instance = self.instance
3019     ignore_secondaries = self.op.ignore_secondaries
3020     reboot_type = self.op.reboot_type
3021
3022     node_current = instance.primary_node
3023
3024     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3025                        constants.INSTANCE_REBOOT_HARD]:
3026       for disk in instance.disks:
3027         self.cfg.SetDiskID(disk, node_current)
3028       result = self.rpc.call_instance_reboot(node_current, instance,
3029                                              reboot_type)
3030       result.Raise("Could not reboot instance")
3031     else:
3032       result = self.rpc.call_instance_shutdown(node_current, instance)
3033       result.Raise("Could not shutdown instance for full reboot")
3034       _ShutdownInstanceDisks(self, instance)
3035       _StartInstanceDisks(self, instance, ignore_secondaries)
3036       result = self.rpc.call_instance_start(node_current, instance, None, None)
3037       msg = result.fail_msg
3038       if msg:
3039         _ShutdownInstanceDisks(self, instance)
3040         raise errors.OpExecError("Could not start instance for"
3041                                  " full reboot: %s" % msg)
3042
3043     self.cfg.MarkInstanceUp(instance.name)
3044
3045
3046 class LUShutdownInstance(LogicalUnit):
3047   """Shutdown an instance.
3048
3049   """
3050   HPATH = "instance-stop"
3051   HTYPE = constants.HTYPE_INSTANCE
3052   _OP_REQP = ["instance_name"]
3053   REQ_BGL = False
3054
3055   def ExpandNames(self):
3056     self._ExpandAndLockInstance()
3057
3058   def BuildHooksEnv(self):
3059     """Build hooks env.
3060
3061     This runs on master, primary and secondary nodes of the instance.
3062
3063     """
3064     env = _BuildInstanceHookEnvByObject(self, self.instance)
3065     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3066     return env, nl, nl
3067
3068   def CheckPrereq(self):
3069     """Check prerequisites.
3070
3071     This checks that the instance is in the cluster.
3072
3073     """
3074     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3075     assert self.instance is not None, \
3076       "Cannot retrieve locked instance %s" % self.op.instance_name
3077     _CheckNodeOnline(self, self.instance.primary_node)
3078
3079   def Exec(self, feedback_fn):
3080     """Shutdown the instance.
3081
3082     """
3083     instance = self.instance
3084     node_current = instance.primary_node
3085     self.cfg.MarkInstanceDown(instance.name)
3086     result = self.rpc.call_instance_shutdown(node_current, instance)
3087     msg = result.fail_msg
3088     if msg:
3089       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3090
3091     _ShutdownInstanceDisks(self, instance)
3092
3093
3094 class LUReinstallInstance(LogicalUnit):
3095   """Reinstall an instance.
3096
3097   """
3098   HPATH = "instance-reinstall"
3099   HTYPE = constants.HTYPE_INSTANCE
3100   _OP_REQP = ["instance_name"]
3101   REQ_BGL = False
3102
3103   def ExpandNames(self):
3104     self._ExpandAndLockInstance()
3105
3106   def BuildHooksEnv(self):
3107     """Build hooks env.
3108
3109     This runs on master, primary and secondary nodes of the instance.
3110
3111     """
3112     env = _BuildInstanceHookEnvByObject(self, self.instance)
3113     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3114     return env, nl, nl
3115
3116   def CheckPrereq(self):
3117     """Check prerequisites.
3118
3119     This checks that the instance is in the cluster and is not running.
3120
3121     """
3122     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3123     assert instance is not None, \
3124       "Cannot retrieve locked instance %s" % self.op.instance_name
3125     _CheckNodeOnline(self, instance.primary_node)
3126
3127     if instance.disk_template == constants.DT_DISKLESS:
3128       raise errors.OpPrereqError("Instance '%s' has no disks" %
3129                                  self.op.instance_name)
3130     if instance.admin_up:
3131       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3132                                  self.op.instance_name)
3133     remote_info = self.rpc.call_instance_info(instance.primary_node,
3134                                               instance.name,
3135                                               instance.hypervisor)
3136     remote_info.Raise("Error checking node %s" % instance.primary_node,
3137                       prereq=True)
3138     if remote_info.payload:
3139       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3140                                  (self.op.instance_name,
3141                                   instance.primary_node))
3142
3143     self.op.os_type = getattr(self.op, "os_type", None)
3144     if self.op.os_type is not None:
3145       # OS verification
3146       pnode = self.cfg.GetNodeInfo(
3147         self.cfg.ExpandNodeName(instance.primary_node))
3148       if pnode is None:
3149         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3150                                    self.op.pnode)
3151       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3152       result.Raise("OS '%s' not in supported OS list for primary node %s" %
3153                    (self.op.os_type, pnode.name), prereq=True)
3154
3155     self.instance = instance
3156
3157   def Exec(self, feedback_fn):
3158     """Reinstall the instance.
3159
3160     """
3161     inst = self.instance
3162
3163     if self.op.os_type is not None:
3164       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3165       inst.os = self.op.os_type
3166       self.cfg.Update(inst)
3167
3168     _StartInstanceDisks(self, inst, None)
3169     try:
3170       feedback_fn("Running the instance OS create scripts...")
3171       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3172       result.Raise("Could not install OS for instance %s on node %s" %
3173                    (inst.name, inst.primary_node))
3174     finally:
3175       _ShutdownInstanceDisks(self, inst)
3176
3177
3178 class LURenameInstance(LogicalUnit):
3179   """Rename an instance.
3180
3181   """
3182   HPATH = "instance-rename"
3183   HTYPE = constants.HTYPE_INSTANCE
3184   _OP_REQP = ["instance_name", "new_name"]
3185
3186   def BuildHooksEnv(self):
3187     """Build hooks env.
3188
3189     This runs on master, primary and secondary nodes of the instance.
3190
3191     """
3192     env = _BuildInstanceHookEnvByObject(self, self.instance)
3193     env["INSTANCE_NEW_NAME"] = self.op.new_name
3194     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3195     return env, nl, nl
3196
3197   def CheckPrereq(self):
3198     """Check prerequisites.
3199
3200     This checks that the instance is in the cluster and is not running.
3201
3202     """
3203     instance = self.cfg.GetInstanceInfo(
3204       self.cfg.ExpandInstanceName(self.op.instance_name))
3205     if instance is None:
3206       raise errors.OpPrereqError("Instance '%s' not known" %
3207                                  self.op.instance_name)
3208     _CheckNodeOnline(self, instance.primary_node)
3209
3210     if instance.admin_up:
3211       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3212                                  self.op.instance_name)
3213     remote_info = self.rpc.call_instance_info(instance.primary_node,
3214                                               instance.name,
3215                                               instance.hypervisor)
3216     remote_info.Raise("Error checking node %s" % instance.primary_node,
3217                       prereq=True)
3218     if remote_info.payload:
3219       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3220                                  (self.op.instance_name,
3221                                   instance.primary_node))
3222     self.instance = instance
3223
3224     # new name verification
3225     name_info = utils.HostInfo(self.op.new_name)
3226
3227     self.op.new_name = new_name = name_info.name
3228     instance_list = self.cfg.GetInstanceList()
3229     if new_name in instance_list:
3230       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3231                                  new_name)
3232
3233     if not getattr(self.op, "ignore_ip", False):
3234       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3235         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3236                                    (name_info.ip, new_name))
3237
3238
3239   def Exec(self, feedback_fn):
3240     """Reinstall the instance.
3241
3242     """
3243     inst = self.instance
3244     old_name = inst.name
3245
3246     if inst.disk_template == constants.DT_FILE:
3247       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3248
3249     self.cfg.RenameInstance(inst.name, self.op.new_name)
3250     # Change the instance lock. This is definitely safe while we hold the BGL
3251     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3252     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3253
3254     # re-read the instance from the configuration after rename
3255     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3256
3257     if inst.disk_template == constants.DT_FILE:
3258       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3259       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3260                                                      old_file_storage_dir,
3261                                                      new_file_storage_dir)
3262       result.Raise("Could not rename on node %s directory '%s' to '%s'"
3263                    " (but the instance has been renamed in Ganeti)" %
3264                    (inst.primary_node, old_file_storage_dir,
3265                     new_file_storage_dir))
3266
3267     _StartInstanceDisks(self, inst, None)
3268     try:
3269       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3270                                                  old_name)
3271       msg = result.fail_msg
3272       if msg:
3273         msg = ("Could not run OS rename script for instance %s on node %s"
3274                " (but the instance has been renamed in Ganeti): %s" %
3275                (inst.name, inst.primary_node, msg))
3276         self.proc.LogWarning(msg)
3277     finally:
3278       _ShutdownInstanceDisks(self, inst)
3279
3280
3281 class LURemoveInstance(LogicalUnit):
3282   """Remove an instance.
3283
3284   """
3285   HPATH = "instance-remove"
3286   HTYPE = constants.HTYPE_INSTANCE
3287   _OP_REQP = ["instance_name", "ignore_failures"]
3288   REQ_BGL = False
3289
3290   def ExpandNames(self):
3291     self._ExpandAndLockInstance()
3292     self.needed_locks[locking.LEVEL_NODE] = []
3293     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3294
3295   def DeclareLocks(self, level):
3296     if level == locking.LEVEL_NODE:
3297       self._LockInstancesNodes()
3298
3299   def BuildHooksEnv(self):
3300     """Build hooks env.
3301
3302     This runs on master, primary and secondary nodes of the instance.
3303
3304     """
3305     env = _BuildInstanceHookEnvByObject(self, self.instance)
3306     nl = [self.cfg.GetMasterNode()]
3307     return env, nl, nl
3308
3309   def CheckPrereq(self):
3310     """Check prerequisites.
3311
3312     This checks that the instance is in the cluster.
3313
3314     """
3315     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3316     assert self.instance is not None, \
3317       "Cannot retrieve locked instance %s" % self.op.instance_name
3318
3319   def Exec(self, feedback_fn):
3320     """Remove the instance.
3321
3322     """
3323     instance = self.instance
3324     logging.info("Shutting down instance %s on node %s",
3325                  instance.name, instance.primary_node)
3326
3327     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3328     msg = result.fail_msg
3329     if msg:
3330       if self.op.ignore_failures:
3331         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3332       else:
3333         raise errors.OpExecError("Could not shutdown instance %s on"
3334                                  " node %s: %s" %
3335                                  (instance.name, instance.primary_node, msg))
3336
3337     logging.info("Removing block devices for instance %s", instance.name)
3338
3339     if not _RemoveDisks(self, instance):
3340       if self.op.ignore_failures:
3341         feedback_fn("Warning: can't remove instance's disks")
3342       else:
3343         raise errors.OpExecError("Can't remove instance's disks")
3344
3345     logging.info("Removing instance %s out of cluster config", instance.name)
3346
3347     self.cfg.RemoveInstance(instance.name)
3348     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3349
3350
3351 class LUQueryInstances(NoHooksLU):
3352   """Logical unit for querying instances.
3353
3354   """
3355   _OP_REQP = ["output_fields", "names", "use_locking"]
3356   REQ_BGL = False
3357   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3358                                     "admin_state",
3359                                     "disk_template", "ip", "mac", "bridge",
3360                                     "nic_mode", "nic_link",
3361                                     "sda_size", "sdb_size", "vcpus", "tags",
3362                                     "network_port", "beparams",
3363                                     r"(disk)\.(size)/([0-9]+)",
3364                                     r"(disk)\.(sizes)", "disk_usage",
3365                                     r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3366                                     r"(nic)\.(bridge)/([0-9]+)",
3367                                     r"(nic)\.(macs|ips|modes|links|bridges)",
3368                                     r"(disk|nic)\.(count)",
3369                                     "serial_no", "hypervisor", "hvparams",] +
3370                                   ["hv/%s" % name
3371                                    for name in constants.HVS_PARAMETERS] +
3372                                   ["be/%s" % name
3373                                    for name in constants.BES_PARAMETERS])
3374   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3375
3376
3377   def ExpandNames(self):
3378     _CheckOutputFields(static=self._FIELDS_STATIC,
3379                        dynamic=self._FIELDS_DYNAMIC,
3380                        selected=self.op.output_fields)
3381
3382     self.needed_locks = {}
3383     self.share_locks[locking.LEVEL_INSTANCE] = 1
3384     self.share_locks[locking.LEVEL_NODE] = 1
3385
3386     if self.op.names:
3387       self.wanted = _GetWantedInstances(self, self.op.names)
3388     else:
3389       self.wanted = locking.ALL_SET
3390
3391     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3392     self.do_locking = self.do_node_query and self.op.use_locking
3393     if self.do_locking:
3394       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3395       self.needed_locks[locking.LEVEL_NODE] = []
3396       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3397
3398   def DeclareLocks(self, level):
3399     if level == locking.LEVEL_NODE and self.do_locking:
3400       self._LockInstancesNodes()
3401
3402   def CheckPrereq(self):
3403     """Check prerequisites.
3404
3405     """
3406     pass
3407
3408   def Exec(self, feedback_fn):
3409     """Computes the list of nodes and their attributes.
3410
3411     """
3412     all_info = self.cfg.GetAllInstancesInfo()
3413     if self.wanted == locking.ALL_SET:
3414       # caller didn't specify instance names, so ordering is not important
3415       if self.do_locking:
3416         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3417       else:
3418         instance_names = all_info.keys()
3419       instance_names = utils.NiceSort(instance_names)
3420     else:
3421       # caller did specify names, so we must keep the ordering
3422       if self.do_locking:
3423         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3424       else:
3425         tgt_set = all_info.keys()
3426       missing = set(self.wanted).difference(tgt_set)
3427       if missing:
3428         raise errors.OpExecError("Some instances were removed before"
3429                                  " retrieving their data: %s" % missing)
3430       instance_names = self.wanted
3431
3432     instance_list = [all_info[iname] for iname in instance_names]
3433
3434     # begin data gathering
3435
3436     nodes = frozenset([inst.primary_node for inst in instance_list])
3437     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3438
3439     bad_nodes = []
3440     off_nodes = []
3441     if self.do_node_query:
3442       live_data = {}
3443       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3444       for name in nodes:
3445         result = node_data[name]
3446         if result.offline:
3447           # offline nodes will be in both lists
3448           off_nodes.append(name)
3449         if result.failed or result.fail_msg:
3450           bad_nodes.append(name)
3451         else:
3452           if result.payload:
3453             live_data.update(result.payload)
3454           # else no instance is alive
3455     else:
3456       live_data = dict([(name, {}) for name in instance_names])
3457
3458     # end data gathering
3459
3460     HVPREFIX = "hv/"
3461     BEPREFIX = "be/"
3462     output = []
3463     cluster = self.cfg.GetClusterInfo()
3464     for instance in instance_list:
3465       iout = []
3466       i_hv = cluster.FillHV(instance)
3467       i_be = cluster.FillBE(instance)
3468       i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3469                                  nic.nicparams) for nic in instance.nics]
3470       for field in self.op.output_fields:
3471         st_match = self._FIELDS_STATIC.Matches(field)
3472         if field == "name":
3473           val = instance.name
3474         elif field == "os":
3475           val = instance.os
3476         elif field == "pnode":
3477           val = instance.primary_node
3478         elif field == "snodes":
3479           val = list(instance.secondary_nodes)
3480         elif field == "admin_state":
3481           val = instance.admin_up
3482         elif field == "oper_state":
3483           if instance.primary_node in bad_nodes:
3484             val = None
3485           else:
3486             val = bool(live_data.get(instance.name))
3487         elif field == "status":
3488           if instance.primary_node in off_nodes:
3489             val = "ERROR_nodeoffline"
3490           elif instance.primary_node in bad_nodes:
3491             val = "ERROR_nodedown"
3492           else:
3493             running = bool(live_data.get(instance.name))
3494             if running:
3495               if instance.admin_up:
3496                 val = "running"
3497               else:
3498                 val = "ERROR_up"
3499             else:
3500               if instance.admin_up:
3501                 val = "ERROR_down"
3502               else:
3503                 val = "ADMIN_down"
3504         elif field == "oper_ram":
3505           if instance.primary_node in bad_nodes:
3506             val = None
3507           elif instance.name in live_data:
3508             val = live_data[instance.name].get("memory", "?")
3509           else:
3510             val = "-"
3511         elif field == "disk_template":
3512           val = instance.disk_template
3513         elif field == "ip":
3514           if instance.nics:
3515             val = instance.nics[0].ip
3516           else:
3517             val = None
3518         elif field == "nic_mode":
3519           if instance.nics:
3520             val = i_nicp[0][constants.NIC_MODE]
3521           else:
3522             val = None
3523         elif field == "nic_link":
3524           if instance.nics:
3525             val = i_nicp[0][constants.NIC_LINK]
3526           else:
3527             val = None
3528         elif field == "bridge":
3529           if (instance.nics and
3530               i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3531             val = i_nicp[0][constants.NIC_LINK]
3532           else:
3533             val = None
3534         elif field == "mac":
3535           if instance.nics:
3536             val = instance.nics[0].mac
3537           else:
3538             val = None
3539         elif field == "sda_size" or field == "sdb_size":
3540           idx = ord(field[2]) - ord('a')
3541           try:
3542             val = instance.FindDisk(idx).size
3543           except errors.OpPrereqError:
3544             val = None
3545         elif field == "disk_usage": # total disk usage per node
3546           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3547           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3548         elif field == "tags":
3549           val = list(instance.GetTags())
3550         elif field == "serial_no":
3551           val = instance.serial_no
3552         elif field == "network_port":
3553           val = instance.network_port
3554         elif field == "hypervisor":
3555           val = instance.hypervisor
3556         elif field == "hvparams":
3557           val = i_hv
3558         elif (field.startswith(HVPREFIX) and
3559               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3560           val = i_hv.get(field[len(HVPREFIX):], None)
3561         elif field == "beparams":
3562           val = i_be
3563         elif (field.startswith(BEPREFIX) and
3564               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3565           val = i_be.get(field[len(BEPREFIX):], None)
3566         elif st_match and st_match.groups():
3567           # matches a variable list
3568           st_groups = st_match.groups()
3569           if st_groups and st_groups[0] == "disk":
3570             if st_groups[1] == "count":
3571               val = len(instance.disks)
3572             elif st_groups[1] == "sizes":
3573               val = [disk.size for disk in instance.disks]
3574             elif st_groups[1] == "size":
3575               try:
3576                 val = instance.FindDisk(st_groups[2]).size
3577               except errors.OpPrereqError:
3578                 val = None
3579             else:
3580               assert False, "Unhandled disk parameter"
3581           elif st_groups[0] == "nic":
3582             if st_groups[1] == "count":
3583               val = len(instance.nics)
3584             elif st_groups[1] == "macs":
3585               val = [nic.mac for nic in instance.nics]
3586             elif st_groups[1] == "ips":
3587               val = [nic.ip for nic in instance.nics]
3588             elif st_groups[1] == "modes":
3589               val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3590             elif st_groups[1] == "links":
3591               val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3592             elif st_groups[1] == "bridges":
3593               val = []
3594               for nicp in i_nicp:
3595                 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3596                   val.append(nicp[constants.NIC_LINK])
3597                 else:
3598                   val.append(None)
3599             else:
3600               # index-based item
3601               nic_idx = int(st_groups[2])
3602               if nic_idx >= len(instance.nics):
3603                 val = None
3604               else:
3605                 if st_groups[1] == "mac":
3606                   val = instance.nics[nic_idx].mac
3607                 elif st_groups[1] == "ip":
3608                   val = instance.nics[nic_idx].ip
3609                 elif st_groups[1] == "mode":
3610                   val = i_nicp[nic_idx][constants.NIC_MODE]
3611                 elif st_groups[1] == "link":
3612                   val = i_nicp[nic_idx][constants.NIC_LINK]
3613                 elif st_groups[1] == "bridge":
3614                   nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3615                   if nic_mode == constants.NIC_MODE_BRIDGED:
3616                     val = i_nicp[nic_idx][constants.NIC_LINK]
3617                   else:
3618                     val = None
3619                 else:
3620                   assert False, "Unhandled NIC parameter"
3621           else:
3622             assert False, "Unhandled variable parameter"
3623         else:
3624           raise errors.ParameterError(field)
3625         iout.append(val)
3626       output.append(iout)
3627
3628     return output
3629
3630
3631 class LUFailoverInstance(LogicalUnit):
3632   """Failover an instance.
3633
3634   """
3635   HPATH = "instance-failover"
3636   HTYPE = constants.HTYPE_INSTANCE
3637   _OP_REQP = ["instance_name", "ignore_consistency"]
3638   REQ_BGL = False
3639
3640   def ExpandNames(self):
3641     self._ExpandAndLockInstance()
3642     self.needed_locks[locking.LEVEL_NODE] = []
3643     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3644
3645   def DeclareLocks(self, level):
3646     if level == locking.LEVEL_NODE:
3647       self._LockInstancesNodes()
3648
3649   def BuildHooksEnv(self):
3650     """Build hooks env.
3651
3652     This runs on master, primary and secondary nodes of the instance.
3653
3654     """
3655     env = {
3656       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3657       }
3658     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3659     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3660     return env, nl, nl
3661
3662   def CheckPrereq(self):
3663     """Check prerequisites.
3664
3665     This checks that the instance is in the cluster.
3666
3667     """
3668     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3669     assert self.instance is not None, \
3670       "Cannot retrieve locked instance %s" % self.op.instance_name
3671
3672     bep = self.cfg.GetClusterInfo().FillBE(instance)
3673     if instance.disk_template not in constants.DTS_NET_MIRROR:
3674       raise errors.OpPrereqError("Instance's disk layout is not"
3675                                  " network mirrored, cannot failover.")
3676
3677     secondary_nodes = instance.secondary_nodes
3678     if not secondary_nodes:
3679       raise errors.ProgrammerError("no secondary node but using "
3680                                    "a mirrored disk template")
3681
3682     target_node = secondary_nodes[0]
3683     _CheckNodeOnline(self, target_node)
3684     _CheckNodeNotDrained(self, target_node)
3685     if instance.admin_up:
3686       # check memory requirements on the secondary node
3687       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3688                            instance.name, bep[constants.BE_MEMORY],
3689                            instance.hypervisor)
3690     else:
3691       self.LogInfo("Not checking memory on the secondary node as"
3692                    " instance will not be started")
3693
3694     # check bridge existance
3695     _CheckInstanceBridgesExist(self, instance, node=target_node)
3696
3697   def Exec(self, feedback_fn):
3698     """Failover an instance.
3699
3700     The failover is done by shutting it down on its present node and
3701     starting it on the secondary.
3702
3703     """
3704     instance = self.instance
3705
3706     source_node = instance.primary_node
3707     target_node = instance.secondary_nodes[0]
3708
3709     feedback_fn("* checking disk consistency between source and target")
3710     for dev in instance.disks:
3711       # for drbd, these are drbd over lvm
3712       if not _CheckDiskConsistency(self, dev, target_node, False):
3713         if instance.admin_up and not self.op.ignore_consistency:
3714           raise errors.OpExecError("Disk %s is degraded on target node,"
3715                                    " aborting failover." % dev.iv_name)
3716
3717     feedback_fn("* shutting down instance on source node")
3718     logging.info("Shutting down instance %s on node %s",
3719                  instance.name, source_node)
3720
3721     result = self.rpc.call_instance_shutdown(source_node, instance)
3722     msg = result.fail_msg
3723     if msg:
3724       if self.op.ignore_consistency:
3725         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3726                              " Proceeding anyway. Please make sure node"
3727                              " %s is down. Error details: %s",
3728                              instance.name, source_node, source_node, msg)
3729       else:
3730         raise errors.OpExecError("Could not shutdown instance %s on"
3731                                  " node %s: %s" %
3732                                  (instance.name, source_node, msg))
3733
3734     feedback_fn("* deactivating the instance's disks on source node")
3735     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3736       raise errors.OpExecError("Can't shut down the instance's disks.")
3737
3738     instance.primary_node = target_node
3739     # distribute new instance config to the other nodes
3740     self.cfg.Update(instance)
3741
3742     # Only start the instance if it's marked as up
3743     if instance.admin_up:
3744       feedback_fn("* activating the instance's disks on target node")
3745       logging.info("Starting instance %s on node %s",
3746                    instance.name, target_node)
3747
3748       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3749                                                ignore_secondaries=True)
3750       if not disks_ok:
3751         _ShutdownInstanceDisks(self, instance)
3752         raise errors.OpExecError("Can't activate the instance's disks")
3753
3754       feedback_fn("* starting the instance on the target node")
3755       result = self.rpc.call_instance_start(target_node, instance, None, None)
3756       msg = result.fail_msg
3757       if msg:
3758         _ShutdownInstanceDisks(self, instance)
3759         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3760                                  (instance.name, target_node, msg))
3761
3762
3763 class LUMigrateInstance(LogicalUnit):
3764   """Migrate an instance.
3765
3766   This is migration without shutting down, compared to the failover,
3767   which is done with shutdown.
3768
3769   """
3770   HPATH = "instance-migrate"
3771   HTYPE = constants.HTYPE_INSTANCE
3772   _OP_REQP = ["instance_name", "live", "cleanup"]
3773
3774   REQ_BGL = False
3775
3776   def ExpandNames(self):
3777     self._ExpandAndLockInstance()
3778     self.needed_locks[locking.LEVEL_NODE] = []
3779     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3780
3781   def DeclareLocks(self, level):
3782     if level == locking.LEVEL_NODE:
3783       self._LockInstancesNodes()
3784
3785   def BuildHooksEnv(self):
3786     """Build hooks env.
3787
3788     This runs on master, primary and secondary nodes of the instance.
3789
3790     """
3791     env = _BuildInstanceHookEnvByObject(self, self.instance)
3792     env["MIGRATE_LIVE"] = self.op.live
3793     env["MIGRATE_CLEANUP"] = self.op.cleanup
3794     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3795     return env, nl, nl
3796
3797   def CheckPrereq(self):
3798     """Check prerequisites.
3799
3800     This checks that the instance is in the cluster.
3801
3802     """
3803     instance = self.cfg.GetInstanceInfo(
3804       self.cfg.ExpandInstanceName(self.op.instance_name))
3805     if instance is None:
3806       raise errors.OpPrereqError("Instance '%s' not known" %
3807                                  self.op.instance_name)
3808
3809     if instance.disk_template != constants.DT_DRBD8:
3810       raise errors.OpPrereqError("Instance's disk layout is not"
3811                                  " drbd8, cannot migrate.")
3812
3813     secondary_nodes = instance.secondary_nodes
3814     if not secondary_nodes:
3815       raise errors.ConfigurationError("No secondary node but using"
3816                                       " drbd8 disk template")
3817
3818     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3819
3820     target_node = secondary_nodes[0]
3821     # check memory requirements on the secondary node
3822     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3823                          instance.name, i_be[constants.BE_MEMORY],
3824                          instance.hypervisor)
3825
3826     # check bridge existance
3827     _CheckInstanceBridgesExist(self, instance, node=target_node)
3828
3829     if not self.op.cleanup:
3830       _CheckNodeNotDrained(self, target_node)
3831       result = self.rpc.call_instance_migratable(instance.primary_node,
3832                                                  instance)
3833       result.Raise("Can't migrate, please use failover", prereq=True)
3834
3835     self.instance = instance
3836
3837   def _WaitUntilSync(self):
3838     """Poll with custom rpc for disk sync.
3839
3840     This uses our own step-based rpc call.
3841
3842     """
3843     self.feedback_fn("* wait until resync is done")
3844     all_done = False
3845     while not all_done:
3846       all_done = True
3847       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3848                                             self.nodes_ip,
3849                                             self.instance.disks)
3850       min_percent = 100
3851       for node, nres in result.items():
3852         nres.Raise("Cannot resync disks on node %s" % node)
3853         node_done, node_percent = nres.payload
3854         all_done = all_done and node_done
3855         if node_percent is not None:
3856           min_percent = min(min_percent, node_percent)
3857       if not all_done:
3858         if min_percent < 100:
3859           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3860         time.sleep(2)
3861
3862   def _EnsureSecondary(self, node):
3863     """Demote a node to secondary.
3864
3865     """
3866     self.feedback_fn("* switching node %s to secondary mode" % node)
3867
3868     for dev in self.instance.disks:
3869       self.cfg.SetDiskID(dev, node)
3870
3871     result = self.rpc.call_blockdev_close(node, self.instance.name,
3872                                           self.instance.disks)
3873     result.Raise("Cannot change disk to secondary on node %s" % node)
3874
3875   def _GoStandalone(self):
3876     """Disconnect from the network.
3877
3878     """
3879     self.feedback_fn("* changing into standalone mode")
3880     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3881                                                self.instance.disks)
3882     for node, nres in result.items():
3883       nres.Raise("Cannot disconnect disks node %s" % node)
3884
3885   def _GoReconnect(self, multimaster):
3886     """Reconnect to the network.
3887
3888     """
3889     if multimaster:
3890       msg = "dual-master"
3891     else:
3892       msg = "single-master"
3893     self.feedback_fn("* changing disks into %s mode" % msg)
3894     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3895                                            self.instance.disks,
3896                                            self.instance.name, multimaster)
3897     for node, nres in result.items():
3898       nres.Raise("Cannot change disks config on node %s" % node)
3899
3900   def _ExecCleanup(self):
3901     """Try to cleanup after a failed migration.
3902
3903     The cleanup is done by:
3904       - check that the instance is running only on one node
3905         (and update the config if needed)
3906       - change disks on its secondary node to secondary
3907       - wait until disks are fully synchronized
3908       - disconnect from the network
3909       - change disks into single-master mode
3910       - wait again until disks are fully synchronized
3911
3912     """
3913     instance = self.instance
3914     target_node = self.target_node
3915     source_node = self.source_node
3916
3917     # check running on only one node
3918     self.feedback_fn("* checking where the instance actually runs"
3919                      " (if this hangs, the hypervisor might be in"
3920                      " a bad state)")
3921     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3922     for node, result in ins_l.items():
3923       result.Raise("Can't contact node %s" % node)
3924
3925     runningon_source = instance.name in ins_l[source_node].payload
3926     runningon_target = instance.name in ins_l[target_node].payload
3927
3928     if runningon_source and runningon_target:
3929       raise errors.OpExecError("Instance seems to be running on two nodes,"
3930                                " or the hypervisor is confused. You will have"
3931                                " to ensure manually that it runs only on one"
3932                                " and restart this operation.")
3933
3934     if not (runningon_source or runningon_target):
3935       raise errors.OpExecError("Instance does not seem to be running at all."
3936                                " In this case, it's safer to repair by"
3937                                " running 'gnt-instance stop' to ensure disk"
3938                                " shutdown, and then restarting it.")
3939
3940     if runningon_target:
3941       # the migration has actually succeeded, we need to update the config
3942       self.feedback_fn("* instance running on secondary node (%s),"
3943                        " updating config" % target_node)
3944       instance.primary_node = target_node
3945       self.cfg.Update(instance)
3946       demoted_node = source_node
3947     else:
3948       self.feedback_fn("* instance confirmed to be running on its"
3949                        " primary node (%s)" % source_node)
3950       demoted_node = target_node
3951
3952     self._EnsureSecondary(demoted_node)
3953     try:
3954       self._WaitUntilSync()
3955     except errors.OpExecError:
3956       # we ignore here errors, since if the device is standalone, it
3957       # won't be able to sync
3958       pass
3959     self._GoStandalone()
3960     self._GoReconnect(False)
3961     self._WaitUntilSync()
3962
3963     self.feedback_fn("* done")
3964
3965   def _RevertDiskStatus(self):
3966     """Try to revert the disk status after a failed migration.
3967
3968     """
3969     target_node = self.target_node
3970     try:
3971       self._EnsureSecondary(target_node)
3972       self._GoStandalone()
3973       self._GoReconnect(False)
3974       self._WaitUntilSync()
3975     except errors.OpExecError, err:
3976       self.LogWarning("Migration failed and I can't reconnect the"
3977                       " drives: error '%s'\n"
3978                       "Please look and recover the instance status" %
3979                       str(err))
3980
3981   def _AbortMigration(self):
3982     """Call the hypervisor code to abort a started migration.
3983
3984     """
3985     instance = self.instance
3986     target_node = self.target_node
3987     migration_info = self.migration_info
3988
3989     abort_result = self.rpc.call_finalize_migration(target_node,
3990                                                     instance,
3991                                                     migration_info,
3992                                                     False)
3993     abort_msg = abort_result.fail_msg
3994     if abort_msg:
3995       logging.error("Aborting migration failed on target node %s: %s" %
3996                     (target_node, abort_msg))
3997       # Don't raise an exception here, as we stil have to try to revert the
3998       # disk status, even if this step failed.
3999
4000   def _ExecMigration(self):
4001     """Migrate an instance.
4002
4003     The migrate is done by:
4004       - change the disks into dual-master mode
4005       - wait until disks are fully synchronized again
4006       - migrate the instance
4007       - change disks on the new secondary node (the old primary) to secondary
4008       - wait until disks are fully synchronized
4009       - change disks into single-master mode
4010
4011     """
4012     instance = self.instance
4013     target_node = self.target_node
4014     source_node = self.source_node
4015
4016     self.feedback_fn("* checking disk consistency between source and target")
4017     for dev in instance.disks:
4018       if not _CheckDiskConsistency(self, dev, target_node, False):
4019         raise errors.OpExecError("Disk %s is degraded or not fully"
4020                                  " synchronized on target node,"
4021                                  " aborting migrate." % dev.iv_name)
4022
4023     # First get the migration information from the remote node
4024     result = self.rpc.call_migration_info(source_node, instance)
4025     msg = result.fail_msg
4026     if msg:
4027       log_err = ("Failed fetching source migration information from %s: %s" %
4028                  (source_node, msg))
4029       logging.error(log_err)
4030       raise errors.OpExecError(log_err)
4031
4032     self.migration_info = migration_info = result.payload
4033
4034     # Then switch the disks to master/master mode
4035     self._EnsureSecondary(target_node)
4036     self._GoStandalone()
4037     self._GoReconnect(True)
4038     self._WaitUntilSync()
4039
4040     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4041     result = self.rpc.call_accept_instance(target_node,
4042                                            instance,
4043                                            migration_info,
4044                                            self.nodes_ip[target_node])
4045
4046     msg = result.fail_msg
4047     if msg:
4048       logging.error("Instance pre-migration failed, trying to revert"
4049                     " disk status: %s", msg)
4050       self._AbortMigration()
4051       self._RevertDiskStatus()
4052       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4053                                (instance.name, msg))
4054
4055     self.feedback_fn("* migrating instance to %s" % target_node)
4056     time.sleep(10)
4057     result = self.rpc.call_instance_migrate(source_node, instance,
4058                                             self.nodes_ip[target_node],
4059                                             self.op.live)
4060     msg = result.fail_msg
4061     if msg:
4062       logging.error("Instance migration failed, trying to revert"
4063                     " disk status: %s", msg)
4064       self._AbortMigration()
4065       self._RevertDiskStatus()
4066       raise errors.OpExecError("Could not migrate instance %s: %s" %
4067                                (instance.name, msg))
4068     time.sleep(10)
4069
4070     instance.primary_node = target_node
4071     # distribute new instance config to the other nodes
4072     self.cfg.Update(instance)
4073
4074     result = self.rpc.call_finalize_migration(target_node,
4075                                               instance,
4076                                               migration_info,
4077                                               True)
4078     msg = result.fail_msg
4079     if msg:
4080       logging.error("Instance migration succeeded, but finalization failed:"
4081                     " %s" % msg)
4082       raise errors.OpExecError("Could not finalize instance migration: %s" %
4083                                msg)
4084
4085     self._EnsureSecondary(source_node)
4086     self._WaitUntilSync()
4087     self._GoStandalone()
4088     self._GoReconnect(False)
4089     self._WaitUntilSync()
4090
4091     self.feedback_fn("* done")
4092
4093   def Exec(self, feedback_fn):
4094     """Perform the migration.
4095
4096     """
4097     self.feedback_fn = feedback_fn
4098
4099     self.source_node = self.instance.primary_node
4100     self.target_node = self.instance.secondary_nodes[0]
4101     self.all_nodes = [self.source_node, self.target_node]
4102     self.nodes_ip = {
4103       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4104       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4105       }
4106     if self.op.cleanup:
4107       return self._ExecCleanup()
4108     else:
4109       return self._ExecMigration()
4110
4111
4112 def _CreateBlockDev(lu, node, instance, device, force_create,
4113                     info, force_open):
4114   """Create a tree of block devices on a given node.
4115
4116   If this device type has to be created on secondaries, create it and
4117   all its children.
4118
4119   If not, just recurse to children keeping the same 'force' value.
4120
4121   @param lu: the lu on whose behalf we execute
4122   @param node: the node on which to create the device
4123   @type instance: L{objects.Instance}
4124   @param instance: the instance which owns the device
4125   @type device: L{objects.Disk}
4126   @param device: the device to create
4127   @type force_create: boolean
4128   @param force_create: whether to force creation of this device; this
4129       will be change to True whenever we find a device which has
4130       CreateOnSecondary() attribute
4131   @param info: the extra 'metadata' we should attach to the device
4132       (this will be represented as a LVM tag)
4133   @type force_open: boolean
4134   @param force_open: this parameter will be passes to the
4135       L{backend.BlockdevCreate} function where it specifies
4136       whether we run on primary or not, and it affects both
4137       the child assembly and the device own Open() execution
4138
4139   """
4140   if device.CreateOnSecondary():
4141     force_create = True
4142
4143   if device.children:
4144     for child in device.children:
4145       _CreateBlockDev(lu, node, instance, child, force_create,
4146                       info, force_open)
4147
4148   if not force_create:
4149     return
4150
4151   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4152
4153
4154 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4155   """Create a single block device on a given node.
4156
4157   This will not recurse over children of the device, so they must be
4158   created in advance.
4159
4160   @param lu: the lu on whose behalf we execute
4161   @param node: the node on which to create the device
4162   @type instance: L{objects.Instance}
4163   @param instance: the instance which owns the device
4164   @type device: L{objects.Disk}
4165   @param device: the device to create
4166   @param info: the extra 'metadata' we should attach to the device
4167       (this will be represented as a LVM tag)
4168   @type force_open: boolean
4169   @param force_open: this parameter will be passes to the
4170       L{backend.BlockdevCreate} function where it specifies
4171       whether we run on primary or not, and it affects both
4172       the child assembly and the device own Open() execution
4173
4174   """
4175   lu.cfg.SetDiskID(device, node)
4176   result = lu.rpc.call_blockdev_create(node, device, device.size,
4177                                        instance.name, force_open, info)
4178   result.Raise("Can't create block device %s on"
4179                " node %s for instance %s" % (device, node, instance.name))
4180   if device.physical_id is None:
4181     device.physical_id = result.payload
4182
4183
4184 def _GenerateUniqueNames(lu, exts):
4185   """Generate a suitable LV name.
4186
4187   This will generate a logical volume name for the given instance.
4188
4189   """
4190   results = []
4191   for val in exts:
4192     new_id = lu.cfg.GenerateUniqueID()
4193     results.append("%s%s" % (new_id, val))
4194   return results
4195
4196
4197 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4198                          p_minor, s_minor):
4199   """Generate a drbd8 device complete with its children.
4200
4201   """
4202   port = lu.cfg.AllocatePort()
4203   vgname = lu.cfg.GetVGName()
4204   shared_secret = lu.cfg.GenerateDRBDSecret()
4205   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4206                           logical_id=(vgname, names[0]))
4207   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4208                           logical_id=(vgname, names[1]))
4209   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4210                           logical_id=(primary, secondary, port,
4211                                       p_minor, s_minor,
4212                                       shared_secret),
4213                           children=[dev_data, dev_meta],
4214                           iv_name=iv_name)
4215   return drbd_dev
4216
4217
4218 def _GenerateDiskTemplate(lu, template_name,
4219                           instance_name, primary_node,
4220                           secondary_nodes, disk_info,
4221                           file_storage_dir, file_driver,
4222                           base_index):
4223   """Generate the entire disk layout for a given template type.
4224
4225   """
4226   #TODO: compute space requirements
4227
4228   vgname = lu.cfg.GetVGName()
4229   disk_count = len(disk_info)
4230   disks = []
4231   if template_name == constants.DT_DISKLESS:
4232     pass
4233   elif template_name == constants.DT_PLAIN:
4234     if len(secondary_nodes) != 0:
4235       raise errors.ProgrammerError("Wrong template configuration")
4236
4237     names = _GenerateUniqueNames(lu, [".disk%d" % i
4238                                       for i in range(disk_count)])
4239     for idx, disk in enumerate(disk_info):
4240       disk_index = idx + base_index
4241       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4242                               logical_id=(vgname, names[idx]),
4243                               iv_name="disk/%d" % disk_index,
4244                               mode=disk["mode"])
4245       disks.append(disk_dev)
4246   elif template_name == constants.DT_DRBD8:
4247     if len(secondary_nodes) != 1:
4248       raise errors.ProgrammerError("Wrong template configuration")
4249     remote_node = secondary_nodes[0]
4250     minors = lu.cfg.AllocateDRBDMinor(
4251       [primary_node, remote_node] * len(disk_info), instance_name)
4252
4253     names = []
4254     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4255                                                for i in range(disk_count)]):
4256       names.append(lv_prefix + "_data")
4257       names.append(lv_prefix + "_meta")
4258     for idx, disk in enumerate(disk_info):
4259       disk_index = idx + base_index
4260       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4261                                       disk["size"], names[idx*2:idx*2+2],
4262                                       "disk/%d" % disk_index,
4263                                       minors[idx*2], minors[idx*2+1])
4264       disk_dev.mode = disk["mode"]
4265       disks.append(disk_dev)
4266   elif template_name == constants.DT_FILE:
4267     if len(secondary_nodes) != 0:
4268       raise errors.ProgrammerError("Wrong template configuration")
4269
4270     for idx, disk in enumerate(disk_info):
4271       disk_index = idx + base_index
4272       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4273                               iv_name="disk/%d" % disk_index,
4274                               logical_id=(file_driver,
4275                                           "%s/disk%d" % (file_storage_dir,
4276                                                          disk_index)),
4277                               mode=disk["mode"])
4278       disks.append(disk_dev)
4279   else:
4280     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4281   return disks
4282
4283
4284 def _GetInstanceInfoText(instance):
4285   """Compute that text that should be added to the disk's metadata.
4286
4287   """
4288   return "originstname+%s" % instance.name
4289
4290
4291 def _CreateDisks(lu, instance):
4292   """Create all disks for an instance.
4293
4294   This abstracts away some work from AddInstance.
4295
4296   @type lu: L{LogicalUnit}
4297   @param lu: the logical unit on whose behalf we execute
4298   @type instance: L{objects.Instance}
4299   @param instance: the instance whose disks we should create
4300   @rtype: boolean
4301   @return: the success of the creation
4302
4303   """
4304   info = _GetInstanceInfoText(instance)
4305   pnode = instance.primary_node
4306
4307   if instance.disk_template == constants.DT_FILE:
4308     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4309     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4310
4311     result.Raise("Failed to create directory '%s' on"
4312                  " node %s: %s" % (file_storage_dir, pnode))
4313
4314   # Note: this needs to be kept in sync with adding of disks in
4315   # LUSetInstanceParams
4316   for device in instance.disks:
4317     logging.info("Creating volume %s for instance %s",
4318                  device.iv_name, instance.name)
4319     #HARDCODE
4320     for node in instance.all_nodes:
4321       f_create = node == pnode
4322       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4323
4324
4325 def _RemoveDisks(lu, instance):
4326   """Remove all disks for an instance.
4327
4328   This abstracts away some work from `AddInstance()` and
4329   `RemoveInstance()`. Note that in case some of the devices couldn't
4330   be removed, the removal will continue with the other ones (compare
4331   with `_CreateDisks()`).
4332
4333   @type lu: L{LogicalUnit}
4334   @param lu: the logical unit on whose behalf we execute
4335   @type instance: L{objects.Instance}
4336   @param instance: the instance whose disks we should remove
4337   @rtype: boolean
4338   @return: the success of the removal
4339
4340   """
4341   logging.info("Removing block devices for instance %s", instance.name)
4342
4343   all_result = True
4344   for device in instance.disks:
4345     for node, disk in device.ComputeNodeTree(instance.primary_node):
4346       lu.cfg.SetDiskID(disk, node)
4347       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4348       if msg:
4349         lu.LogWarning("Could not remove block device %s on node %s,"
4350                       " continuing anyway: %s", device.iv_name, node, msg)
4351         all_result = False
4352
4353   if instance.disk_template == constants.DT_FILE:
4354     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4355     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4356                                                  file_storage_dir)
4357     msg = result.fail_msg
4358     if msg:
4359       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4360                     file_storage_dir, instance.primary_node, msg)
4361       all_result = False
4362
4363   return all_result
4364
4365
4366 def _ComputeDiskSize(disk_template, disks):
4367   """Compute disk size requirements in the volume group
4368
4369   """
4370   # Required free disk space as a function of disk and swap space
4371   req_size_dict = {
4372     constants.DT_DISKLESS: None,
4373     constants.DT_PLAIN: sum(d["size"] for d in disks),
4374     # 128 MB are added for drbd metadata for each disk
4375     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4376     constants.DT_FILE: None,
4377   }
4378
4379   if disk_template not in req_size_dict:
4380     raise errors.ProgrammerError("Disk template '%s' size requirement"
4381                                  " is unknown" %  disk_template)
4382
4383   return req_size_dict[disk_template]
4384
4385
4386 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4387   """Hypervisor parameter validation.
4388
4389   This function abstract the hypervisor parameter validation to be
4390   used in both instance create and instance modify.
4391
4392   @type lu: L{LogicalUnit}
4393   @param lu: the logical unit for which we check
4394   @type nodenames: list
4395   @param nodenames: the list of nodes on which we should check
4396   @type hvname: string
4397   @param hvname: the name of the hypervisor we should use
4398   @type hvparams: dict
4399   @param hvparams: the parameters which we need to check
4400   @raise errors.OpPrereqError: if the parameters are not valid
4401
4402   """
4403   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4404                                                   hvname,
4405                                                   hvparams)
4406   for node in nodenames:
4407     info = hvinfo[node]
4408     if info.offline:
4409       continue
4410     info.Raise("Hypervisor parameter validation failed on node %s" % node)
4411
4412
4413 class LUCreateInstance(LogicalUnit):
4414   """Create an instance.
4415
4416   """
4417   HPATH = "instance-add"
4418   HTYPE = constants.HTYPE_INSTANCE
4419   _OP_REQP = ["instance_name", "disks", "disk_template",
4420               "mode", "start",
4421               "wait_for_sync", "ip_check", "nics",
4422               "hvparams", "beparams"]
4423   REQ_BGL = False
4424
4425   def _ExpandNode(self, node):
4426     """Expands and checks one node name.
4427
4428     """
4429     node_full = self.cfg.ExpandNodeName(node)
4430     if node_full is None:
4431       raise errors.OpPrereqError("Unknown node %s" % node)
4432     return node_full
4433
4434   def ExpandNames(self):
4435     """ExpandNames for CreateInstance.
4436
4437     Figure out the right locks for instance creation.
4438
4439     """
4440     self.needed_locks = {}
4441
4442     # set optional parameters to none if they don't exist
4443     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4444       if not hasattr(self.op, attr):
4445         setattr(self.op, attr, None)
4446
4447     # cheap checks, mostly valid constants given
4448
4449     # verify creation mode
4450     if self.op.mode not in (constants.INSTANCE_CREATE,
4451                             constants.INSTANCE_IMPORT):
4452       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4453                                  self.op.mode)
4454
4455     # disk template and mirror node verification
4456     if self.op.disk_template not in constants.DISK_TEMPLATES:
4457       raise errors.OpPrereqError("Invalid disk template name")
4458
4459     if self.op.hypervisor is None:
4460       self.op.hypervisor = self.cfg.GetHypervisorType()
4461
4462     cluster = self.cfg.GetClusterInfo()
4463     enabled_hvs = cluster.enabled_hypervisors
4464     if self.op.hypervisor not in enabled_hvs:
4465       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4466                                  " cluster (%s)" % (self.op.hypervisor,
4467                                   ",".join(enabled_hvs)))
4468
4469     # check hypervisor parameter syntax (locally)
4470     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4471     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4472                                   self.op.hvparams)
4473     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4474     hv_type.CheckParameterSyntax(filled_hvp)
4475     self.hv_full = filled_hvp
4476
4477     # fill and remember the beparams dict
4478     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4479     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4480                                     self.op.beparams)
4481
4482     #### instance parameters check
4483
4484     # instance name verification
4485     hostname1 = utils.HostInfo(self.op.instance_name)
4486     self.op.instance_name = instance_name = hostname1.name
4487
4488     # this is just a preventive check, but someone might still add this
4489     # instance in the meantime, and creation will fail at lock-add time
4490     if instance_name in self.cfg.GetInstanceList():
4491       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4492                                  instance_name)
4493
4494     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4495
4496     # NIC buildup
4497     self.nics = []
4498     for idx, nic in enumerate(self.op.nics):
4499       nic_mode_req = nic.get("mode", None)
4500       nic_mode = nic_mode_req
4501       if nic_mode is None:
4502         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4503
4504       # in routed mode, for the first nic, the default ip is 'auto'
4505       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4506         default_ip_mode = constants.VALUE_AUTO
4507       else:
4508         default_ip_mode = constants.VALUE_NONE
4509
4510       # ip validity checks
4511       ip = nic.get("ip", default_ip_mode)
4512       if ip is None or ip.lower() == constants.VALUE_NONE:
4513         nic_ip = None
4514       elif ip.lower() == constants.VALUE_AUTO:
4515         nic_ip = hostname1.ip
4516       else:
4517         if not utils.IsValidIP(ip):
4518           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4519                                      " like a valid IP" % ip)
4520         nic_ip = ip
4521
4522       # TODO: check the ip for uniqueness !!
4523       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4524         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4525
4526       # MAC address verification
4527       mac = nic.get("mac", constants.VALUE_AUTO)
4528       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4529         if not utils.IsValidMac(mac.lower()):
4530           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4531                                      mac)
4532       # bridge verification
4533       bridge = nic.get("bridge", None)
4534       link = nic.get("link", None)
4535       if bridge and link:
4536         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4537                                    " at the same time")
4538       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4539         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4540       elif bridge:
4541         link = bridge
4542
4543       nicparams = {}
4544       if nic_mode_req:
4545         nicparams[constants.NIC_MODE] = nic_mode_req
4546       if link:
4547         nicparams[constants.NIC_LINK] = link
4548
4549       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4550                                       nicparams)
4551       objects.NIC.CheckParameterSyntax(check_params)
4552       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4553
4554     # disk checks/pre-build
4555     self.disks = []
4556     for disk in self.op.disks:
4557       mode = disk.get("mode", constants.DISK_RDWR)
4558       if mode not in constants.DISK_ACCESS_SET:
4559         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4560                                    mode)
4561       size = disk.get("size", None)
4562       if size is None:
4563         raise errors.OpPrereqError("Missing disk size")
4564       try:
4565         size = int(size)
4566       except ValueError:
4567         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4568       self.disks.append({"size": size, "mode": mode})
4569
4570     # used in CheckPrereq for ip ping check
4571     self.check_ip = hostname1.ip
4572
4573     # file storage checks
4574     if (self.op.file_driver and
4575         not self.op.file_driver in constants.FILE_DRIVER):
4576       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4577                                  self.op.file_driver)
4578
4579     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4580       raise errors.OpPrereqError("File storage directory path not absolute")
4581
4582     ### Node/iallocator related checks
4583     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4584       raise errors.OpPrereqError("One and only one of iallocator and primary"
4585                                  " node must be given")
4586
4587     if self.op.iallocator:
4588       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4589     else:
4590       self.op.pnode = self._ExpandNode(self.op.pnode)
4591       nodelist = [self.op.pnode]
4592       if self.op.snode is not None:
4593         self.op.snode = self._ExpandNode(self.op.snode)
4594         nodelist.append(self.op.snode)
4595       self.needed_locks[locking.LEVEL_NODE] = nodelist
4596
4597     # in case of import lock the source node too
4598     if self.op.mode == constants.INSTANCE_IMPORT:
4599       src_node = getattr(self.op, "src_node", None)
4600       src_path = getattr(self.op, "src_path", None)
4601
4602       if src_path is None:
4603         self.op.src_path = src_path = self.op.instance_name
4604
4605       if src_node is None:
4606         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4607         self.op.src_node = None
4608         if os.path.isabs(src_path):
4609           raise errors.OpPrereqError("Importing an instance from an absolute"
4610                                      " path requires a source node option.")
4611       else:
4612         self.op.src_node = src_node = self._ExpandNode(src_node)
4613         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4614           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4615         if not os.path.isabs(src_path):
4616           self.op.src_path = src_path = \
4617             os.path.join(constants.EXPORT_DIR, src_path)
4618
4619     else: # INSTANCE_CREATE
4620       if getattr(self.op, "os_type", None) is None:
4621         raise errors.OpPrereqError("No guest OS specified")
4622
4623   def _RunAllocator(self):
4624     """Run the allocator based on input opcode.
4625
4626     """
4627     nics = [n.ToDict() for n in self.nics]
4628     ial = IAllocator(self,
4629                      mode=constants.IALLOCATOR_MODE_ALLOC,
4630                      name=self.op.instance_name,
4631                      disk_template=self.op.disk_template,
4632                      tags=[],
4633                      os=self.op.os_type,
4634                      vcpus=self.be_full[constants.BE_VCPUS],
4635                      mem_size=self.be_full[constants.BE_MEMORY],
4636                      disks=self.disks,
4637                      nics=nics,
4638                      hypervisor=self.op.hypervisor,
4639                      )
4640
4641     ial.Run(self.op.iallocator)
4642
4643     if not ial.success:
4644       raise errors.OpPrereqError("Can't compute nodes using"
4645                                  " iallocator '%s': %s" % (self.op.iallocator,
4646                                                            ial.info))
4647     if len(ial.nodes) != ial.required_nodes:
4648       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4649                                  " of nodes (%s), required %s" %
4650                                  (self.op.iallocator, len(ial.nodes),
4651                                   ial.required_nodes))
4652     self.op.pnode = ial.nodes[0]
4653     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4654                  self.op.instance_name, self.op.iallocator,
4655                  ", ".join(ial.nodes))
4656     if ial.required_nodes == 2:
4657       self.op.snode = ial.nodes[1]
4658
4659   def BuildHooksEnv(self):
4660     """Build hooks env.
4661
4662     This runs on master, primary and secondary nodes of the instance.
4663
4664     """
4665     env = {
4666       "ADD_MODE": self.op.mode,
4667       }
4668     if self.op.mode == constants.INSTANCE_IMPORT:
4669       env["SRC_NODE"] = self.op.src_node
4670       env["SRC_PATH"] = self.op.src_path
4671       env["SRC_IMAGES"] = self.src_images
4672
4673     env.update(_BuildInstanceHookEnv(
4674       name=self.op.instance_name,
4675       primary_node=self.op.pnode,
4676       secondary_nodes=self.secondaries,
4677       status=self.op.start,
4678       os_type=self.op.os_type,
4679       memory=self.be_full[constants.BE_MEMORY],
4680       vcpus=self.be_full[constants.BE_VCPUS],
4681       nics=_NICListToTuple(self, self.nics),
4682       disk_template=self.op.disk_template,
4683       disks=[(d["size"], d["mode"]) for d in self.disks],
4684       bep=self.be_full,
4685       hvp=self.hv_full,
4686       hypervisor=self.op.hypervisor,
4687     ))
4688
4689     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4690           self.secondaries)
4691     return env, nl, nl
4692
4693
4694   def CheckPrereq(self):
4695     """Check prerequisites.
4696
4697     """
4698     if (not self.cfg.GetVGName() and
4699         self.op.disk_template not in constants.DTS_NOT_LVM):
4700       raise errors.OpPrereqError("Cluster does not support lvm-based"
4701                                  " instances")
4702
4703     if self.op.mode == constants.INSTANCE_IMPORT:
4704       src_node = self.op.src_node
4705       src_path = self.op.src_path
4706
4707       if src_node is None:
4708         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4709         exp_list = self.rpc.call_export_list(locked_nodes)
4710         found = False
4711         for node in exp_list:
4712           if exp_list[node].fail_msg:
4713             continue
4714           if src_path in exp_list[node].payload:
4715             found = True
4716             self.op.src_node = src_node = node
4717             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4718                                                        src_path)
4719             break
4720         if not found:
4721           raise errors.OpPrereqError("No export found for relative path %s" %
4722                                       src_path)
4723
4724       _CheckNodeOnline(self, src_node)
4725       result = self.rpc.call_export_info(src_node, src_path)
4726       result.Raise("No export or invalid export found in dir %s" % src_path)
4727
4728       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4729       if not export_info.has_section(constants.INISECT_EXP):
4730         raise errors.ProgrammerError("Corrupted export config")
4731
4732       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4733       if (int(ei_version) != constants.EXPORT_VERSION):
4734         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4735                                    (ei_version, constants.EXPORT_VERSION))
4736
4737       # Check that the new instance doesn't have less disks than the export
4738       instance_disks = len(self.disks)
4739       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4740       if instance_disks < export_disks:
4741         raise errors.OpPrereqError("Not enough disks to import."
4742                                    " (instance: %d, export: %d)" %
4743                                    (instance_disks, export_disks))
4744
4745       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4746       disk_images = []
4747       for idx in range(export_disks):
4748         option = 'disk%d_dump' % idx
4749         if export_info.has_option(constants.INISECT_INS, option):
4750           # FIXME: are the old os-es, disk sizes, etc. useful?
4751           export_name = export_info.get(constants.INISECT_INS, option)
4752           image = os.path.join(src_path, export_name)
4753           disk_images.append(image)
4754         else:
4755           disk_images.append(False)
4756
4757       self.src_images = disk_images
4758
4759       old_name = export_info.get(constants.INISECT_INS, 'name')
4760       # FIXME: int() here could throw a ValueError on broken exports
4761       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4762       if self.op.instance_name == old_name:
4763         for idx, nic in enumerate(self.nics):
4764           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4765             nic_mac_ini = 'nic%d_mac' % idx
4766             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4767
4768     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4769     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4770     if self.op.start and not self.op.ip_check:
4771       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4772                                  " adding an instance in start mode")
4773
4774     if self.op.ip_check:
4775       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4776         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4777                                    (self.check_ip, self.op.instance_name))
4778
4779     #### mac address generation
4780     # By generating here the mac address both the allocator and the hooks get
4781     # the real final mac address rather than the 'auto' or 'generate' value.
4782     # There is a race condition between the generation and the instance object
4783     # creation, which means that we know the mac is valid now, but we're not
4784     # sure it will be when we actually add the instance. If things go bad
4785     # adding the instance will abort because of a duplicate mac, and the
4786     # creation job will fail.
4787     for nic in self.nics:
4788       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4789         nic.mac = self.cfg.GenerateMAC()
4790
4791     #### allocator run
4792
4793     if self.op.iallocator is not None:
4794       self._RunAllocator()
4795
4796     #### node related checks
4797
4798     # check primary node
4799     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4800     assert self.pnode is not None, \
4801       "Cannot retrieve locked node %s" % self.op.pnode
4802     if pnode.offline:
4803       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4804                                  pnode.name)
4805     if pnode.drained:
4806       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4807                                  pnode.name)
4808
4809     self.secondaries = []
4810
4811     # mirror node verification
4812     if self.op.disk_template in constants.DTS_NET_MIRROR:
4813       if self.op.snode is None:
4814         raise errors.OpPrereqError("The networked disk templates need"
4815                                    " a mirror node")
4816       if self.op.snode == pnode.name:
4817         raise errors.OpPrereqError("The secondary node cannot be"
4818                                    " the primary node.")
4819       _CheckNodeOnline(self, self.op.snode)
4820       _CheckNodeNotDrained(self, self.op.snode)
4821       self.secondaries.append(self.op.snode)
4822
4823     nodenames = [pnode.name] + self.secondaries
4824
4825     req_size = _ComputeDiskSize(self.op.disk_template,
4826                                 self.disks)
4827
4828     # Check lv size requirements
4829     if req_size is not None:
4830       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4831                                          self.op.hypervisor)
4832       for node in nodenames:
4833         info = nodeinfo[node]
4834         info.Raise("Cannot get current information from node %s" % node)
4835         info = info.payload
4836         vg_free = info.get('vg_free', None)
4837         if not isinstance(vg_free, int):
4838           raise errors.OpPrereqError("Can't compute free disk space on"
4839                                      " node %s" % node)
4840         if req_size > vg_free:
4841           raise errors.OpPrereqError("Not enough disk space on target node %s."
4842                                      " %d MB available, %d MB required" %
4843                                      (node, vg_free, req_size))
4844
4845     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4846
4847     # os verification
4848     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4849     result.Raise("OS '%s' not in supported os list for primary node %s" %
4850                  (self.op.os_type, pnode.name), prereq=True)
4851
4852     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4853
4854     # memory check on primary node
4855     if self.op.start:
4856       _CheckNodeFreeMemory(self, self.pnode.name,
4857                            "creating instance %s" % self.op.instance_name,
4858                            self.be_full[constants.BE_MEMORY],
4859                            self.op.hypervisor)
4860
4861   def Exec(self, feedback_fn):
4862     """Create and add the instance to the cluster.
4863
4864     """
4865     instance = self.op.instance_name
4866     pnode_name = self.pnode.name
4867
4868     ht_kind = self.op.hypervisor
4869     if ht_kind in constants.HTS_REQ_PORT:
4870       network_port = self.cfg.AllocatePort()
4871     else:
4872       network_port = None
4873
4874     ##if self.op.vnc_bind_address is None:
4875     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4876
4877     # this is needed because os.path.join does not accept None arguments
4878     if self.op.file_storage_dir is None:
4879       string_file_storage_dir = ""
4880     else:
4881       string_file_storage_dir = self.op.file_storage_dir
4882
4883     # build the full file storage dir path
4884     file_storage_dir = os.path.normpath(os.path.join(
4885                                         self.cfg.GetFileStorageDir(),
4886                                         string_file_storage_dir, instance))
4887
4888
4889     disks = _GenerateDiskTemplate(self,
4890                                   self.op.disk_template,
4891                                   instance, pnode_name,
4892                                   self.secondaries,
4893                                   self.disks,
4894                                   file_storage_dir,
4895                                   self.op.file_driver,
4896                                   0)
4897
4898     iobj = objects.Instance(name=instance, os=self.op.os_type,
4899                             primary_node=pnode_name,
4900                             nics=self.nics, disks=disks,
4901                             disk_template=self.op.disk_template,
4902                             admin_up=False,
4903                             network_port=network_port,
4904                             beparams=self.op.beparams,
4905                             hvparams=self.op.hvparams,
4906                             hypervisor=self.op.hypervisor,
4907                             )
4908
4909     feedback_fn("* creating instance disks...")
4910     try:
4911       _CreateDisks(self, iobj)
4912     except errors.OpExecError:
4913       self.LogWarning("Device creation failed, reverting...")
4914       try:
4915         _RemoveDisks(self, iobj)
4916       finally:
4917         self.cfg.ReleaseDRBDMinors(instance)
4918         raise
4919
4920     feedback_fn("adding instance %s to cluster config" % instance)
4921
4922     self.cfg.AddInstance(iobj)
4923     # Declare that we don't want to remove the instance lock anymore, as we've
4924     # added the instance to the config
4925     del self.remove_locks[locking.LEVEL_INSTANCE]
4926     # Unlock all the nodes
4927     if self.op.mode == constants.INSTANCE_IMPORT:
4928       nodes_keep = [self.op.src_node]
4929       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4930                        if node != self.op.src_node]
4931       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4932       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4933     else:
4934       self.context.glm.release(locking.LEVEL_NODE)
4935       del self.acquired_locks[locking.LEVEL_NODE]
4936
4937     if self.op.wait_for_sync:
4938       disk_abort = not _WaitForSync(self, iobj)
4939     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4940       # make sure the disks are not degraded (still sync-ing is ok)
4941       time.sleep(15)
4942       feedback_fn("* checking mirrors status")
4943       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4944     else:
4945       disk_abort = False
4946
4947     if disk_abort:
4948       _RemoveDisks(self, iobj)
4949       self.cfg.RemoveInstance(iobj.name)
4950       # Make sure the instance lock gets removed
4951       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4952       raise errors.OpExecError("There are some degraded disks for"
4953                                " this instance")
4954
4955     feedback_fn("creating os for instance %s on node %s" %
4956                 (instance, pnode_name))
4957
4958     if iobj.disk_template != constants.DT_DISKLESS:
4959       if self.op.mode == constants.INSTANCE_CREATE:
4960         feedback_fn("* running the instance OS create scripts...")
4961         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4962         result.Raise("Could not add os for instance %s"
4963                      " on node %s" % (instance, pnode_name))
4964
4965       elif self.op.mode == constants.INSTANCE_IMPORT:
4966         feedback_fn("* running the instance OS import scripts...")
4967         src_node = self.op.src_node
4968         src_images = self.src_images
4969         cluster_name = self.cfg.GetClusterName()
4970         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4971                                                          src_node, src_images,
4972                                                          cluster_name)
4973         msg = import_result.fail_msg
4974         if msg:
4975           self.LogWarning("Error while importing the disk images for instance"
4976                           " %s on node %s: %s" % (instance, pnode_name, msg))
4977       else:
4978         # also checked in the prereq part
4979         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4980                                      % self.op.mode)
4981
4982     if self.op.start:
4983       iobj.admin_up = True
4984       self.cfg.Update(iobj)
4985       logging.info("Starting instance %s on node %s", instance, pnode_name)
4986       feedback_fn("* starting instance...")
4987       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4988       result.Raise("Could not start instance")
4989
4990
4991 class LUConnectConsole(NoHooksLU):
4992   """Connect to an instance's console.
4993
4994   This is somewhat special in that it returns the command line that
4995   you need to run on the master node in order to connect to the
4996   console.
4997
4998   """
4999   _OP_REQP = ["instance_name"]
5000   REQ_BGL = False
5001
5002   def ExpandNames(self):
5003     self._ExpandAndLockInstance()
5004
5005   def CheckPrereq(self):
5006     """Check prerequisites.
5007
5008     This checks that the instance is in the cluster.
5009
5010     """
5011     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5012     assert self.instance is not None, \
5013       "Cannot retrieve locked instance %s" % self.op.instance_name
5014     _CheckNodeOnline(self, self.instance.primary_node)
5015
5016   def Exec(self, feedback_fn):
5017     """Connect to the console of an instance
5018
5019     """
5020     instance = self.instance
5021     node = instance.primary_node
5022
5023     node_insts = self.rpc.call_instance_list([node],
5024                                              [instance.hypervisor])[node]
5025     node_insts.Raise("Can't get node information from %s" % node)
5026
5027     if instance.name not in node_insts.payload:
5028       raise errors.OpExecError("Instance %s is not running." % instance.name)
5029
5030     logging.debug("Connecting to console of %s on %s", instance.name, node)
5031
5032     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5033     cluster = self.cfg.GetClusterInfo()
5034     # beparams and hvparams are passed separately, to avoid editing the
5035     # instance and then saving the defaults in the instance itself.
5036     hvparams = cluster.FillHV(instance)
5037     beparams = cluster.FillBE(instance)
5038     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5039
5040     # build ssh cmdline
5041     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5042
5043
5044 class LUReplaceDisks(LogicalUnit):
5045   """Replace the disks of an instance.
5046
5047   """
5048   HPATH = "mirrors-replace"
5049   HTYPE = constants.HTYPE_INSTANCE
5050   _OP_REQP = ["instance_name", "mode", "disks"]
5051   REQ_BGL = False
5052
5053   def CheckArguments(self):
5054     if not hasattr(self.op, "remote_node"):
5055       self.op.remote_node = None
5056     if not hasattr(self.op, "iallocator"):
5057       self.op.iallocator = None
5058
5059     # check for valid parameter combination
5060     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5061     if self.op.mode == constants.REPLACE_DISK_CHG:
5062       if cnt == 2:
5063         raise errors.OpPrereqError("When changing the secondary either an"
5064                                    " iallocator script must be used or the"
5065                                    " new node given")
5066       elif cnt == 0:
5067         raise errors.OpPrereqError("Give either the iallocator or the new"
5068                                    " secondary, not both")
5069     else: # not replacing the secondary
5070       if cnt != 2:
5071         raise errors.OpPrereqError("The iallocator and new node options can"
5072                                    " be used only when changing the"
5073                                    " secondary node")
5074
5075   def ExpandNames(self):
5076     self._ExpandAndLockInstance()
5077
5078     if self.op.iallocator is not None:
5079       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5080     elif self.op.remote_node is not None:
5081       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5082       if remote_node is None:
5083         raise errors.OpPrereqError("Node '%s' not known" %
5084                                    self.op.remote_node)
5085       self.op.remote_node = remote_node
5086       # Warning: do not remove the locking of the new secondary here
5087       # unless DRBD8.AddChildren is changed to work in parallel;
5088       # currently it doesn't since parallel invocations of
5089       # FindUnusedMinor will conflict
5090       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5091       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5092     else:
5093       self.needed_locks[locking.LEVEL_NODE] = []
5094       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5095
5096   def DeclareLocks(self, level):
5097     # If we're not already locking all nodes in the set we have to declare the
5098     # instance's primary/secondary nodes.
5099     if (level == locking.LEVEL_NODE and
5100         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5101       self._LockInstancesNodes()
5102
5103   def _RunAllocator(self):
5104     """Compute a new secondary node using an IAllocator.
5105
5106     """
5107     ial = IAllocator(self,
5108                      mode=constants.IALLOCATOR_MODE_RELOC,
5109                      name=self.op.instance_name,
5110                      relocate_from=[self.sec_node])
5111
5112     ial.Run(self.op.iallocator)
5113
5114     if not ial.success:
5115       raise errors.OpPrereqError("Can't compute nodes using"
5116                                  " iallocator '%s': %s" % (self.op.iallocator,
5117                                                            ial.info))
5118     if len(ial.nodes) != ial.required_nodes:
5119       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5120                                  " of nodes (%s), required %s" %
5121                                  (len(ial.nodes), ial.required_nodes))
5122     self.op.remote_node = ial.nodes[0]
5123     self.LogInfo("Selected new secondary for the instance: %s",
5124                  self.op.remote_node)
5125
5126   def BuildHooksEnv(self):
5127     """Build hooks env.
5128
5129     This runs on the master, the primary and all the secondaries.
5130
5131     """
5132     env = {
5133       "MODE": self.op.mode,
5134       "NEW_SECONDARY": self.op.remote_node,
5135       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5136       }
5137     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5138     nl = [
5139       self.cfg.GetMasterNode(),
5140       self.instance.primary_node,
5141       ]
5142     if self.op.remote_node is not None:
5143       nl.append(self.op.remote_node)
5144     return env, nl, nl
5145
5146   def CheckPrereq(self):
5147     """Check prerequisites.
5148
5149     This checks that the instance is in the cluster.
5150
5151     """
5152     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5153     assert instance is not None, \
5154       "Cannot retrieve locked instance %s" % self.op.instance_name
5155     self.instance = instance
5156
5157     if instance.disk_template != constants.DT_DRBD8:
5158       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5159                                  " instances")
5160
5161     if len(instance.secondary_nodes) != 1:
5162       raise errors.OpPrereqError("The instance has a strange layout,"
5163                                  " expected one secondary but found %d" %
5164                                  len(instance.secondary_nodes))
5165
5166     self.sec_node = instance.secondary_nodes[0]
5167
5168     if self.op.iallocator is not None:
5169       self._RunAllocator()
5170
5171     remote_node = self.op.remote_node
5172     if remote_node is not None:
5173       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5174       assert self.remote_node_info is not None, \
5175         "Cannot retrieve locked node %s" % remote_node
5176     else:
5177       self.remote_node_info = None
5178     if remote_node == instance.primary_node:
5179       raise errors.OpPrereqError("The specified node is the primary node of"
5180                                  " the instance.")
5181     elif remote_node == self.sec_node:
5182       raise errors.OpPrereqError("The specified node is already the"
5183                                  " secondary node of the instance.")
5184
5185     if self.op.mode == constants.REPLACE_DISK_PRI:
5186       n1 = self.tgt_node = instance.primary_node
5187       n2 = self.oth_node = self.sec_node
5188     elif self.op.mode == constants.REPLACE_DISK_SEC:
5189       n1 = self.tgt_node = self.sec_node
5190       n2 = self.oth_node = instance.primary_node
5191     elif self.op.mode == constants.REPLACE_DISK_CHG:
5192       n1 = self.new_node = remote_node
5193       n2 = self.oth_node = instance.primary_node
5194       self.tgt_node = self.sec_node
5195       _CheckNodeNotDrained(self, remote_node)
5196     else:
5197       raise errors.ProgrammerError("Unhandled disk replace mode")
5198
5199     _CheckNodeOnline(self, n1)
5200     _CheckNodeOnline(self, n2)
5201
5202     if not self.op.disks:
5203       self.op.disks = range(len(instance.disks))
5204
5205     for disk_idx in self.op.disks:
5206       instance.FindDisk(disk_idx)
5207
5208   def _ExecD8DiskOnly(self, feedback_fn):
5209     """Replace a disk on the primary or secondary for dbrd8.
5210
5211     The algorithm for replace is quite complicated:
5212
5213       1. for each disk to be replaced:
5214
5215         1. create new LVs on the target node with unique names
5216         1. detach old LVs from the drbd device
5217         1. rename old LVs to name_replaced.<time_t>
5218         1. rename new LVs to old LVs
5219         1. attach the new LVs (with the old names now) to the drbd device
5220
5221       1. wait for sync across all devices
5222
5223       1. for each modified disk:
5224
5225         1. remove old LVs (which have the name name_replaces.<time_t>)
5226
5227     Failures are not very well handled.
5228
5229     """
5230     steps_total = 6
5231     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5232     instance = self.instance
5233     iv_names = {}
5234     vgname = self.cfg.GetVGName()
5235     # start of work
5236     cfg = self.cfg
5237     tgt_node = self.tgt_node
5238     oth_node = self.oth_node
5239
5240     # Step: check device activation
5241     self.proc.LogStep(1, steps_total, "check device existence")
5242     info("checking volume groups")
5243     my_vg = cfg.GetVGName()
5244     results = self.rpc.call_vg_list([oth_node, tgt_node])
5245     if not results:
5246       raise errors.OpExecError("Can't list volume groups on the nodes")
5247     for node in oth_node, tgt_node:
5248       res = results[node]
5249       res.Raise("Error checking node %s" % node)
5250       if my_vg not in res.payload:
5251         raise errors.OpExecError("Volume group '%s' not found on %s" %
5252                                  (my_vg, node))
5253     for idx, dev in enumerate(instance.disks):
5254       if idx not in self.op.disks:
5255         continue
5256       for node in tgt_node, oth_node:
5257         info("checking disk/%d on %s" % (idx, node))
5258         cfg.SetDiskID(dev, node)
5259         result = self.rpc.call_blockdev_find(node, dev)
5260         msg = result.fail_msg
5261         if not msg and not result.payload:
5262           msg = "disk not found"
5263         if msg:
5264           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5265                                    (idx, node, msg))
5266
5267     # Step: check other node consistency
5268     self.proc.LogStep(2, steps_total, "check peer consistency")
5269     for idx, dev in enumerate(instance.disks):
5270       if idx not in self.op.disks:
5271         continue
5272       info("checking disk/%d consistency on %s" % (idx, oth_node))
5273       if not _CheckDiskConsistency(self, dev, oth_node,
5274                                    oth_node==instance.primary_node):
5275         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5276                                  " to replace disks on this node (%s)" %
5277                                  (oth_node, tgt_node))
5278
5279     # Step: create new storage
5280     self.proc.LogStep(3, steps_total, "allocate new storage")
5281     for idx, dev in enumerate(instance.disks):
5282       if idx not in self.op.disks:
5283         continue
5284       size = dev.size
5285       cfg.SetDiskID(dev, tgt_node)
5286       lv_names = [".disk%d_%s" % (idx, suf)
5287                   for suf in ["data", "meta"]]
5288       names = _GenerateUniqueNames(self, lv_names)
5289       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5290                              logical_id=(vgname, names[0]))
5291       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5292                              logical_id=(vgname, names[1]))
5293       new_lvs = [lv_data, lv_meta]
5294       old_lvs = dev.children
5295       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5296       info("creating new local storage on %s for %s" %
5297            (tgt_node, dev.iv_name))
5298       # we pass force_create=True to force the LVM creation
5299       for new_lv in new_lvs:
5300         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5301                         _GetInstanceInfoText(instance), False)
5302
5303     # Step: for each lv, detach+rename*2+attach
5304     self.proc.LogStep(4, steps_total, "change drbd configuration")
5305     for dev, old_lvs, new_lvs in iv_names.itervalues():
5306       info("detaching %s drbd from local storage" % dev.iv_name)
5307       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5308       result.Raise("Can't detach drbd from local storage on node"
5309                    " %s for device %s" % (tgt_node, dev.iv_name))
5310       #dev.children = []
5311       #cfg.Update(instance)
5312
5313       # ok, we created the new LVs, so now we know we have the needed
5314       # storage; as such, we proceed on the target node to rename
5315       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5316       # using the assumption that logical_id == physical_id (which in
5317       # turn is the unique_id on that node)
5318
5319       # FIXME(iustin): use a better name for the replaced LVs
5320       temp_suffix = int(time.time())
5321       ren_fn = lambda d, suff: (d.physical_id[0],
5322                                 d.physical_id[1] + "_replaced-%s" % suff)
5323       # build the rename list based on what LVs exist on the node
5324       rlist = []
5325       for to_ren in old_lvs:
5326         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5327         if not result.fail_msg and result.payload:
5328           # device exists
5329           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5330
5331       info("renaming the old LVs on the target node")
5332       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5333       result.Raise("Can't rename old LVs on node %s" % tgt_node)
5334       # now we rename the new LVs to the old LVs
5335       info("renaming the new LVs on the target node")
5336       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5337       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5338       result.Raise("Can't rename new LVs on node %s" % tgt_node)
5339
5340       for old, new in zip(old_lvs, new_lvs):
5341         new.logical_id = old.logical_id
5342         cfg.SetDiskID(new, tgt_node)
5343
5344       for disk in old_lvs:
5345         disk.logical_id = ren_fn(disk, temp_suffix)
5346         cfg.SetDiskID(disk, tgt_node)
5347
5348       # now that the new lvs have the old name, we can add them to the device
5349       info("adding new mirror component on %s" % tgt_node)
5350       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5351       msg = result.fail_msg
5352       if msg:
5353         for new_lv in new_lvs:
5354           msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5355           if msg2:
5356             warning("Can't rollback device %s: %s", dev, msg2,
5357                     hint="cleanup manually the unused logical volumes")
5358         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5359
5360       dev.children = new_lvs
5361       cfg.Update(instance)
5362
5363     # Step: wait for sync
5364
5365     # this can fail as the old devices are degraded and _WaitForSync
5366     # does a combined result over all disks, so we don't check its
5367     # return value
5368     self.proc.LogStep(5, steps_total, "sync devices")
5369     _WaitForSync(self, instance, unlock=True)
5370
5371     # so check manually all the devices
5372     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5373       cfg.SetDiskID(dev, instance.primary_node)
5374       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5375       msg = result.fail_msg
5376       if not msg and not result.payload:
5377         msg = "disk not found"
5378       if msg:
5379         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5380                                  (name, msg))
5381       if result.payload[5]:
5382         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5383
5384     # Step: remove old storage
5385     self.proc.LogStep(6, steps_total, "removing old storage")
5386     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5387       info("remove logical volumes for %s" % name)
5388       for lv in old_lvs:
5389         cfg.SetDiskID(lv, tgt_node)
5390         msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5391         if msg:
5392           warning("Can't remove old LV: %s" % msg,
5393                   hint="manually remove unused LVs")
5394           continue
5395
5396   def _ExecD8Secondary(self, feedback_fn):
5397     """Replace the secondary node for drbd8.
5398
5399     The algorithm for replace is quite complicated:
5400       - for all disks of the instance:
5401         - create new LVs on the new node with same names
5402         - shutdown the drbd device on the old secondary
5403         - disconnect the drbd network on the primary
5404         - create the drbd device on the new secondary
5405         - network attach the drbd on the primary, using an artifice:
5406           the drbd code for Attach() will connect to the network if it
5407           finds a device which is connected to the good local disks but
5408           not network enabled
5409       - wait for sync across all devices
5410       - remove all disks from the old secondary
5411
5412     Failures are not very well handled.
5413
5414     """
5415     steps_total = 6
5416     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5417     instance = self.instance
5418     iv_names = {}
5419     # start of work
5420     cfg = self.cfg
5421     old_node = self.tgt_node
5422     new_node = self.new_node
5423     pri_node = instance.primary_node
5424     nodes_ip = {
5425       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5426       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5427       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5428       }
5429
5430     # Step: check device activation
5431     self.proc.LogStep(1, steps_total, "check device existence")
5432     info("checking volume groups")
5433     my_vg = cfg.GetVGName()
5434     results = self.rpc.call_vg_list([pri_node, new_node])
5435     for node in pri_node, new_node:
5436       res = results[node]
5437       res.Raise("Error checking node %s" % node)
5438       if my_vg not in res.payload:
5439         raise errors.OpExecError("Volume group '%s' not found on %s" %
5440                                  (my_vg, node))
5441     for idx, dev in enumerate(instance.disks):
5442       if idx not in self.op.disks:
5443         continue
5444       info("checking disk/%d on %s" % (idx, pri_node))
5445       cfg.SetDiskID(dev, pri_node)
5446       result = self.rpc.call_blockdev_find(pri_node, dev)
5447       msg = result.fail_msg
5448       if not msg and not result.payload:
5449         msg = "disk not found"
5450       if msg:
5451         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5452                                  (idx, pri_node, msg))
5453
5454     # Step: check other node consistency
5455     self.proc.LogStep(2, steps_total, "check peer consistency")
5456     for idx, dev in enumerate(instance.disks):
5457       if idx not in self.op.disks:
5458         continue
5459       info("checking disk/%d consistency on %s" % (idx, pri_node))
5460       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5461         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5462                                  " unsafe to replace the secondary" %
5463                                  pri_node)
5464
5465     # Step: create new storage
5466     self.proc.LogStep(3, steps_total, "allocate new storage")
5467     for idx, dev in enumerate(instance.disks):
5468       info("adding new local storage on %s for disk/%d" %
5469            (new_node, idx))
5470       # we pass force_create=True to force LVM creation
5471       for new_lv in dev.children:
5472         _CreateBlockDev(self, new_node, instance, new_lv, True,
5473                         _GetInstanceInfoText(instance), False)
5474
5475     # Step 4: dbrd minors and drbd setups changes
5476     # after this, we must manually remove the drbd minors on both the
5477     # error and the success paths
5478     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5479                                    instance.name)
5480     logging.debug("Allocated minors %s" % (minors,))
5481     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5482     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5483       size = dev.size
5484       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5485       # create new devices on new_node; note that we create two IDs:
5486       # one without port, so the drbd will be activated without
5487       # networking information on the new node at this stage, and one
5488       # with network, for the latter activation in step 4
5489       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5490       if pri_node == o_node1:
5491         p_minor = o_minor1
5492       else:
5493         p_minor = o_minor2
5494
5495       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5496       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5497
5498       iv_names[idx] = (dev, dev.children, new_net_id)
5499       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5500                     new_net_id)
5501       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5502                               logical_id=new_alone_id,
5503                               children=dev.children,
5504                               size=dev.size)
5505       try:
5506         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5507                               _GetInstanceInfoText(instance), False)
5508       except errors.GenericError:
5509         self.cfg.ReleaseDRBDMinors(instance.name)
5510         raise
5511
5512     for idx, dev in enumerate(instance.disks):
5513       # we have new devices, shutdown the drbd on the old secondary
5514       info("shutting down drbd for disk/%d on old node" % idx)
5515       cfg.SetDiskID(dev, old_node)
5516       msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5517       if msg:
5518         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5519                 (idx, msg),
5520                 hint="Please cleanup this device manually as soon as possible")
5521
5522     info("detaching primary drbds from the network (=> standalone)")
5523     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5524                                                instance.disks)[pri_node]
5525
5526     msg = result.fail_msg
5527     if msg:
5528       # detaches didn't succeed (unlikely)
5529       self.cfg.ReleaseDRBDMinors(instance.name)
5530       raise errors.OpExecError("Can't detach the disks from the network on"
5531                                " old node: %s" % (msg,))
5532
5533     # if we managed to detach at least one, we update all the disks of
5534     # the instance to point to the new secondary
5535     info("updating instance configuration")
5536     for dev, _, new_logical_id in iv_names.itervalues():
5537       dev.logical_id = new_logical_id
5538       cfg.SetDiskID(dev, pri_node)
5539     cfg.Update(instance)
5540
5541     # and now perform the drbd attach
5542     info("attaching primary drbds to new secondary (standalone => connected)")
5543     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5544                                            instance.disks, instance.name,
5545                                            False)
5546     for to_node, to_result in result.items():
5547       msg = to_result.fail_msg
5548       if msg:
5549         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5550                 hint="please do a gnt-instance info to see the"
5551                 " status of disks")
5552
5553     # this can fail as the old devices are degraded and _WaitForSync
5554     # does a combined result over all disks, so we don't check its
5555     # return value
5556     self.proc.LogStep(5, steps_total, "sync devices")
5557     _WaitForSync(self, instance, unlock=True)
5558
5559     # so check manually all the devices
5560     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5561       cfg.SetDiskID(dev, pri_node)
5562       result = self.rpc.call_blockdev_find(pri_node, dev)
5563       msg = result.fail_msg
5564       if not msg and not result.payload:
5565         msg = "disk not found"
5566       if msg:
5567         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5568                                  (idx, msg))
5569       if result.payload[5]:
5570         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5571
5572     self.proc.LogStep(6, steps_total, "removing old storage")
5573     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5574       info("remove logical volumes for disk/%d" % idx)
5575       for lv in old_lvs:
5576         cfg.SetDiskID(lv, old_node)
5577         msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5578         if msg:
5579           warning("Can't remove LV on old secondary: %s", msg,
5580                   hint="Cleanup stale volumes by hand")
5581
5582   def Exec(self, feedback_fn):
5583     """Execute disk replacement.
5584
5585     This dispatches the disk replacement to the appropriate handler.
5586
5587     """
5588     instance = self.instance
5589
5590     # Activate the instance disks if we're replacing them on a down instance
5591     if not instance.admin_up:
5592       _StartInstanceDisks(self, instance, True)
5593
5594     if self.op.mode == constants.REPLACE_DISK_CHG:
5595       fn = self._ExecD8Secondary
5596     else:
5597       fn = self._ExecD8DiskOnly
5598
5599     ret = fn(feedback_fn)
5600
5601     # Deactivate the instance disks if we're replacing them on a down instance
5602     if not instance.admin_up:
5603       _SafeShutdownInstanceDisks(self, instance)
5604
5605     return ret
5606
5607
5608 class LUGrowDisk(LogicalUnit):
5609   """Grow a disk of an instance.
5610
5611   """
5612   HPATH = "disk-grow"
5613   HTYPE = constants.HTYPE_INSTANCE
5614   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5615   REQ_BGL = False
5616
5617   def ExpandNames(self):
5618     self._ExpandAndLockInstance()
5619     self.needed_locks[locking.LEVEL_NODE] = []
5620     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5621
5622   def DeclareLocks(self, level):
5623     if level == locking.LEVEL_NODE:
5624       self._LockInstancesNodes()
5625
5626   def BuildHooksEnv(self):
5627     """Build hooks env.
5628
5629     This runs on the master, the primary and all the secondaries.
5630
5631     """
5632     env = {
5633       "DISK": self.op.disk,
5634       "AMOUNT": self.op.amount,
5635       }
5636     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5637     nl = [
5638       self.cfg.GetMasterNode(),
5639       self.instance.primary_node,
5640       ]
5641     return env, nl, nl
5642
5643   def CheckPrereq(self):
5644     """Check prerequisites.
5645
5646     This checks that the instance is in the cluster.
5647
5648     """
5649     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5650     assert instance is not None, \
5651       "Cannot retrieve locked instance %s" % self.op.instance_name
5652     nodenames = list(instance.all_nodes)
5653     for node in nodenames:
5654       _CheckNodeOnline(self, node)
5655
5656
5657     self.instance = instance
5658
5659     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5660       raise errors.OpPrereqError("Instance's disk layout does not support"
5661                                  " growing.")
5662
5663     self.disk = instance.FindDisk(self.op.disk)
5664
5665     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5666                                        instance.hypervisor)
5667     for node in nodenames:
5668       info = nodeinfo[node]
5669       info.Raise("Cannot get current information from node %s" % node)
5670       vg_free = info.payload.get('vg_free', None)
5671       if not isinstance(vg_free, int):
5672         raise errors.OpPrereqError("Can't compute free disk space on"
5673                                    " node %s" % node)
5674       if self.op.amount > vg_free:
5675         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5676                                    " %d MiB available, %d MiB required" %
5677                                    (node, vg_free, self.op.amount))
5678
5679   def Exec(self, feedback_fn):
5680     """Execute disk grow.
5681
5682     """
5683     instance = self.instance
5684     disk = self.disk
5685     for node in instance.all_nodes:
5686       self.cfg.SetDiskID(disk, node)
5687       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5688       result.Raise("Grow request failed to node %s" % node)
5689     disk.RecordGrow(self.op.amount)
5690     self.cfg.Update(instance)
5691     if self.op.wait_for_sync:
5692       disk_abort = not _WaitForSync(self, instance)
5693       if disk_abort:
5694         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5695                              " status.\nPlease check the instance.")
5696
5697
5698 class LUQueryInstanceData(NoHooksLU):
5699   """Query runtime instance data.
5700
5701   """
5702   _OP_REQP = ["instances", "static"]
5703   REQ_BGL = False
5704
5705   def ExpandNames(self):
5706     self.needed_locks = {}
5707     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5708
5709     if not isinstance(self.op.instances, list):
5710       raise errors.OpPrereqError("Invalid argument type 'instances'")
5711
5712     if self.op.instances:
5713       self.wanted_names = []
5714       for name in self.op.instances:
5715         full_name = self.cfg.ExpandInstanceName(name)
5716         if full_name is None:
5717           raise errors.OpPrereqError("Instance '%s' not known" % name)
5718         self.wanted_names.append(full_name)
5719       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5720     else:
5721       self.wanted_names = None
5722       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5723
5724     self.needed_locks[locking.LEVEL_NODE] = []
5725     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5726
5727   def DeclareLocks(self, level):
5728     if level == locking.LEVEL_NODE:
5729       self._LockInstancesNodes()
5730
5731   def CheckPrereq(self):
5732     """Check prerequisites.
5733
5734     This only checks the optional instance list against the existing names.
5735
5736     """
5737     if self.wanted_names is None:
5738       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5739
5740     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5741                              in self.wanted_names]
5742     return
5743
5744   def _ComputeDiskStatus(self, instance, snode, dev):
5745     """Compute block device status.
5746
5747     """
5748     static = self.op.static
5749     if not static:
5750       self.cfg.SetDiskID(dev, instance.primary_node)
5751       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5752       if dev_pstatus.offline:
5753         dev_pstatus = None
5754       else:
5755         dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5756         dev_pstatus = dev_pstatus.payload
5757     else:
5758       dev_pstatus = None
5759
5760     if dev.dev_type in constants.LDS_DRBD:
5761       # we change the snode then (otherwise we use the one passed in)
5762       if dev.logical_id[0] == instance.primary_node:
5763         snode = dev.logical_id[1]
5764       else:
5765         snode = dev.logical_id[0]
5766
5767     if snode and not static:
5768       self.cfg.SetDiskID(dev, snode)
5769       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5770       if dev_sstatus.offline:
5771         dev_sstatus = None
5772       else:
5773         dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5774         dev_sstatus = dev_sstatus.payload
5775     else:
5776       dev_sstatus = None
5777
5778     if dev.children:
5779       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5780                       for child in dev.children]
5781     else:
5782       dev_children = []
5783
5784     data = {
5785       "iv_name": dev.iv_name,
5786       "dev_type": dev.dev_type,
5787       "logical_id": dev.logical_id,
5788       "physical_id": dev.physical_id,
5789       "pstatus": dev_pstatus,
5790       "sstatus": dev_sstatus,
5791       "children": dev_children,
5792       "mode": dev.mode,
5793       }
5794
5795     return data
5796
5797   def Exec(self, feedback_fn):
5798     """Gather and return data"""
5799     result = {}
5800
5801     cluster = self.cfg.GetClusterInfo()
5802
5803     for instance in self.wanted_instances:
5804       if not self.op.static:
5805         remote_info = self.rpc.call_instance_info(instance.primary_node,
5806                                                   instance.name,
5807                                                   instance.hypervisor)
5808         remote_info.Raise("Error checking node %s" % instance.primary_node)
5809         remote_info = remote_info.payload
5810         if remote_info and "state" in remote_info:
5811           remote_state = "up"
5812         else:
5813           remote_state = "down"
5814       else:
5815         remote_state = None
5816       if instance.admin_up:
5817         config_state = "up"
5818       else:
5819         config_state = "down"
5820
5821       disks = [self._ComputeDiskStatus(instance, None, device)
5822                for device in instance.disks]
5823
5824       idict = {
5825         "name": instance.name,
5826         "config_state": config_state,
5827         "run_state": remote_state,
5828         "pnode": instance.primary_node,
5829         "snodes": instance.secondary_nodes,
5830         "os": instance.os,
5831         # this happens to be the same format used for hooks
5832         "nics": _NICListToTuple(self, instance.nics),
5833         "disks": disks,
5834         "hypervisor": instance.hypervisor,
5835         "network_port": instance.network_port,
5836         "hv_instance": instance.hvparams,
5837         "hv_actual": cluster.FillHV(instance),
5838         "be_instance": instance.beparams,
5839         "be_actual": cluster.FillBE(instance),
5840         }
5841
5842       result[instance.name] = idict
5843
5844     return result
5845
5846
5847 class LUSetInstanceParams(LogicalUnit):
5848   """Modifies an instances's parameters.
5849
5850   """
5851   HPATH = "instance-modify"
5852   HTYPE = constants.HTYPE_INSTANCE
5853   _OP_REQP = ["instance_name"]
5854   REQ_BGL = False
5855
5856   def CheckArguments(self):
5857     if not hasattr(self.op, 'nics'):
5858       self.op.nics = []
5859     if not hasattr(self.op, 'disks'):
5860       self.op.disks = []
5861     if not hasattr(self.op, 'beparams'):
5862       self.op.beparams = {}
5863     if not hasattr(self.op, 'hvparams'):
5864       self.op.hvparams = {}
5865     self.op.force = getattr(self.op, "force", False)
5866     if not (self.op.nics or self.op.disks or
5867             self.op.hvparams or self.op.beparams):
5868       raise errors.OpPrereqError("No changes submitted")
5869
5870     # Disk validation
5871     disk_addremove = 0
5872     for disk_op, disk_dict in self.op.disks:
5873       if disk_op == constants.DDM_REMOVE:
5874         disk_addremove += 1
5875         continue
5876       elif disk_op == constants.DDM_ADD:
5877         disk_addremove += 1
5878       else:
5879         if not isinstance(disk_op, int):
5880           raise errors.OpPrereqError("Invalid disk index")
5881       if disk_op == constants.DDM_ADD:
5882         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5883         if mode not in constants.DISK_ACCESS_SET:
5884           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5885         size = disk_dict.get('size', None)
5886         if size is None:
5887           raise errors.OpPrereqError("Required disk parameter size missing")
5888         try:
5889           size = int(size)
5890         except ValueError, err:
5891           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5892                                      str(err))
5893         disk_dict['size'] = size
5894       else:
5895         # modification of disk
5896         if 'size' in disk_dict:
5897           raise errors.OpPrereqError("Disk size change not possible, use"
5898                                      " grow-disk")
5899
5900     if disk_addremove > 1:
5901       raise errors.OpPrereqError("Only one disk add or remove operation"
5902                                  " supported at a time")
5903
5904     # NIC validation
5905     nic_addremove = 0
5906     for nic_op, nic_dict in self.op.nics:
5907       if nic_op == constants.DDM_REMOVE:
5908         nic_addremove += 1
5909         continue
5910       elif nic_op == constants.DDM_ADD:
5911         nic_addremove += 1
5912       else:
5913         if not isinstance(nic_op, int):
5914           raise errors.OpPrereqError("Invalid nic index")
5915
5916       # nic_dict should be a dict
5917       nic_ip = nic_dict.get('ip', None)
5918       if nic_ip is not None:
5919         if nic_ip.lower() == constants.VALUE_NONE:
5920           nic_dict['ip'] = None
5921         else:
5922           if not utils.IsValidIP(nic_ip):
5923             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5924
5925       nic_bridge = nic_dict.get('bridge', None)
5926       nic_link = nic_dict.get('link', None)
5927       if nic_bridge and nic_link:
5928         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5929                                    " at the same time")
5930       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5931         nic_dict['bridge'] = None
5932       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5933         nic_dict['link'] = None
5934
5935       if nic_op == constants.DDM_ADD:
5936         nic_mac = nic_dict.get('mac', None)
5937         if nic_mac is None:
5938           nic_dict['mac'] = constants.VALUE_AUTO
5939
5940       if 'mac' in nic_dict:
5941         nic_mac = nic_dict['mac']
5942         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5943           if not utils.IsValidMac(nic_mac):
5944             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5945         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5946           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5947                                      " modifying an existing nic")
5948
5949     if nic_addremove > 1:
5950       raise errors.OpPrereqError("Only one NIC add or remove operation"
5951                                  " supported at a time")
5952
5953   def ExpandNames(self):
5954     self._ExpandAndLockInstance()
5955     self.needed_locks[locking.LEVEL_NODE] = []
5956     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5957
5958   def DeclareLocks(self, level):
5959     if level == locking.LEVEL_NODE:
5960       self._LockInstancesNodes()
5961
5962   def BuildHooksEnv(self):
5963     """Build hooks env.
5964
5965     This runs on the master, primary and secondaries.
5966
5967     """
5968     args = dict()
5969     if constants.BE_MEMORY in self.be_new:
5970       args['memory'] = self.be_new[constants.BE_MEMORY]
5971     if constants.BE_VCPUS in self.be_new:
5972       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5973     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5974     # information at all.
5975     if self.op.nics:
5976       args['nics'] = []
5977       nic_override = dict(self.op.nics)
5978       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5979       for idx, nic in enumerate(self.instance.nics):
5980         if idx in nic_override:
5981           this_nic_override = nic_override[idx]
5982         else:
5983           this_nic_override = {}
5984         if 'ip' in this_nic_override:
5985           ip = this_nic_override['ip']
5986         else:
5987           ip = nic.ip
5988         if 'mac' in this_nic_override:
5989           mac = this_nic_override['mac']
5990         else:
5991           mac = nic.mac
5992         if idx in self.nic_pnew:
5993           nicparams = self.nic_pnew[idx]
5994         else:
5995           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
5996         mode = nicparams[constants.NIC_MODE]
5997         link = nicparams[constants.NIC_LINK]
5998         args['nics'].append((ip, mac, mode, link))
5999       if constants.DDM_ADD in nic_override:
6000         ip = nic_override[constants.DDM_ADD].get('ip', None)
6001         mac = nic_override[constants.DDM_ADD]['mac']
6002         nicparams = self.nic_pnew[constants.DDM_ADD]
6003         mode = nicparams[constants.NIC_MODE]
6004         link = nicparams[constants.NIC_LINK]
6005         args['nics'].append((ip, mac, mode, link))
6006       elif constants.DDM_REMOVE in nic_override:
6007         del args['nics'][-1]
6008
6009     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6010     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6011     return env, nl, nl
6012
6013   def _GetUpdatedParams(self, old_params, update_dict,
6014                         default_values, parameter_types):
6015     """Return the new params dict for the given params.
6016
6017     @type old_params: dict
6018     @type old_params: old parameters
6019     @type update_dict: dict
6020     @type update_dict: dict containing new parameter values,
6021                        or constants.VALUE_DEFAULT to reset the
6022                        parameter to its default value
6023     @type default_values: dict
6024     @param default_values: default values for the filled parameters
6025     @type parameter_types: dict
6026     @param parameter_types: dict mapping target dict keys to types
6027                             in constants.ENFORCEABLE_TYPES
6028     @rtype: (dict, dict)
6029     @return: (new_parameters, filled_parameters)
6030
6031     """
6032     params_copy = copy.deepcopy(old_params)
6033     for key, val in update_dict.iteritems():
6034       if val == constants.VALUE_DEFAULT:
6035         try:
6036           del params_copy[key]
6037         except KeyError:
6038           pass
6039       else:
6040         params_copy[key] = val
6041     utils.ForceDictType(params_copy, parameter_types)
6042     params_filled = objects.FillDict(default_values, params_copy)
6043     return (params_copy, params_filled)
6044
6045   def CheckPrereq(self):
6046     """Check prerequisites.
6047
6048     This only checks the instance list against the existing names.
6049
6050     """
6051     force = self.force = self.op.force
6052
6053     # checking the new params on the primary/secondary nodes
6054
6055     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6056     cluster = self.cluster = self.cfg.GetClusterInfo()
6057     assert self.instance is not None, \
6058       "Cannot retrieve locked instance %s" % self.op.instance_name
6059     pnode = instance.primary_node
6060     nodelist = list(instance.all_nodes)
6061
6062     # hvparams processing
6063     if self.op.hvparams:
6064       i_hvdict, hv_new = self._GetUpdatedParams(
6065                              instance.hvparams, self.op.hvparams,
6066                              cluster.hvparams[instance.hypervisor],
6067                              constants.HVS_PARAMETER_TYPES)
6068       # local check
6069       hypervisor.GetHypervisor(
6070         instance.hypervisor).CheckParameterSyntax(hv_new)
6071       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6072       self.hv_new = hv_new # the new actual values
6073       self.hv_inst = i_hvdict # the new dict (without defaults)
6074     else:
6075       self.hv_new = self.hv_inst = {}
6076
6077     # beparams processing
6078     if self.op.beparams:
6079       i_bedict, be_new = self._GetUpdatedParams(
6080                              instance.beparams, self.op.beparams,
6081                              cluster.beparams[constants.PP_DEFAULT],
6082                              constants.BES_PARAMETER_TYPES)
6083       self.be_new = be_new # the new actual values
6084       self.be_inst = i_bedict # the new dict (without defaults)
6085     else:
6086       self.be_new = self.be_inst = {}
6087
6088     self.warn = []
6089
6090     if constants.BE_MEMORY in self.op.beparams and not self.force:
6091       mem_check_list = [pnode]
6092       if be_new[constants.BE_AUTO_BALANCE]:
6093         # either we changed auto_balance to yes or it was from before
6094         mem_check_list.extend(instance.secondary_nodes)
6095       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6096                                                   instance.hypervisor)
6097       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6098                                          instance.hypervisor)
6099       pninfo = nodeinfo[pnode]
6100       msg = pninfo.fail_msg
6101       if msg:
6102         # Assume the primary node is unreachable and go ahead
6103         self.warn.append("Can't get info from primary node %s: %s" %
6104                          (pnode,  msg))
6105       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6106         self.warn.append("Node data from primary node %s doesn't contain"
6107                          " free memory information" % pnode)
6108       elif instance_info.fail_msg:
6109         self.warn.append("Can't get instance runtime information: %s" %
6110                         instance_info.fail_msg)
6111       else:
6112         if instance_info.payload:
6113           current_mem = int(instance_info.payload['memory'])
6114         else:
6115           # Assume instance not running
6116           # (there is a slight race condition here, but it's not very probable,
6117           # and we have no other way to check)
6118           current_mem = 0
6119         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6120                     pninfo.payload['memory_free'])
6121         if miss_mem > 0:
6122           raise errors.OpPrereqError("This change will prevent the instance"
6123                                      " from starting, due to %d MB of memory"
6124                                      " missing on its primary node" % miss_mem)
6125
6126       if be_new[constants.BE_AUTO_BALANCE]:
6127         for node, nres in nodeinfo.items():
6128           if node not in instance.secondary_nodes:
6129             continue
6130           msg = nres.fail_msg
6131           if msg:
6132             self.warn.append("Can't get info from secondary node %s: %s" %
6133                              (node, msg))
6134           elif not isinstance(nres.payload.get('memory_free', None), int):
6135             self.warn.append("Secondary node %s didn't return free"
6136                              " memory information" % node)
6137           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6138             self.warn.append("Not enough memory to failover instance to"
6139                              " secondary node %s" % node)
6140
6141     # NIC processing
6142     self.nic_pnew = {}
6143     self.nic_pinst = {}
6144     for nic_op, nic_dict in self.op.nics:
6145       if nic_op == constants.DDM_REMOVE:
6146         if not instance.nics:
6147           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6148         continue
6149       if nic_op != constants.DDM_ADD:
6150         # an existing nic
6151         if nic_op < 0 or nic_op >= len(instance.nics):
6152           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6153                                      " are 0 to %d" %
6154                                      (nic_op, len(instance.nics)))
6155         old_nic_params = instance.nics[nic_op].nicparams
6156         old_nic_ip = instance.nics[nic_op].ip
6157       else:
6158         old_nic_params = {}
6159         old_nic_ip = None
6160
6161       update_params_dict = dict([(key, nic_dict[key])
6162                                  for key in constants.NICS_PARAMETERS
6163                                  if key in nic_dict])
6164
6165       if 'bridge' in nic_dict:
6166         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6167
6168       new_nic_params, new_filled_nic_params = \
6169           self._GetUpdatedParams(old_nic_params, update_params_dict,
6170                                  cluster.nicparams[constants.PP_DEFAULT],
6171                                  constants.NICS_PARAMETER_TYPES)
6172       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6173       self.nic_pinst[nic_op] = new_nic_params
6174       self.nic_pnew[nic_op] = new_filled_nic_params
6175       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6176
6177       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6178         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6179         msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6180         if msg:
6181           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6182           if self.force:
6183             self.warn.append(msg)
6184           else:
6185             raise errors.OpPrereqError(msg)
6186       if new_nic_mode == constants.NIC_MODE_ROUTED:
6187         if 'ip' in nic_dict:
6188           nic_ip = nic_dict['ip']
6189         else:
6190           nic_ip = old_nic_ip
6191         if nic_ip is None:
6192           raise errors.OpPrereqError('Cannot set the nic ip to None'
6193                                      ' on a routed nic')
6194       if 'mac' in nic_dict:
6195         nic_mac = nic_dict['mac']
6196         if nic_mac is None:
6197           raise errors.OpPrereqError('Cannot set the nic mac to None')
6198         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6199           # otherwise generate the mac
6200           nic_dict['mac'] = self.cfg.GenerateMAC()
6201         else:
6202           # or validate/reserve the current one
6203           if self.cfg.IsMacInUse(nic_mac):
6204             raise errors.OpPrereqError("MAC address %s already in use"
6205                                        " in cluster" % nic_mac)
6206
6207     # DISK processing
6208     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6209       raise errors.OpPrereqError("Disk operations not supported for"
6210                                  " diskless instances")
6211     for disk_op, disk_dict in self.op.disks:
6212       if disk_op == constants.DDM_REMOVE:
6213         if len(instance.disks) == 1:
6214           raise errors.OpPrereqError("Cannot remove the last disk of"
6215                                      " an instance")
6216         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6217         ins_l = ins_l[pnode]
6218         msg = ins_l.fail_msg
6219         if msg:
6220           raise errors.OpPrereqError("Can't contact node %s: %s" %
6221                                      (pnode, msg))
6222         if instance.name in ins_l.payload:
6223           raise errors.OpPrereqError("Instance is running, can't remove"
6224                                      " disks.")
6225
6226       if (disk_op == constants.DDM_ADD and
6227           len(instance.nics) >= constants.MAX_DISKS):
6228         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6229                                    " add more" % constants.MAX_DISKS)
6230       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6231         # an existing disk
6232         if disk_op < 0 or disk_op >= len(instance.disks):
6233           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6234                                      " are 0 to %d" %
6235                                      (disk_op, len(instance.disks)))
6236
6237     return
6238
6239   def Exec(self, feedback_fn):
6240     """Modifies an instance.
6241
6242     All parameters take effect only at the next restart of the instance.
6243
6244     """
6245     # Process here the warnings from CheckPrereq, as we don't have a
6246     # feedback_fn there.
6247     for warn in self.warn:
6248       feedback_fn("WARNING: %s" % warn)
6249
6250     result = []
6251     instance = self.instance
6252     cluster = self.cluster
6253     # disk changes
6254     for disk_op, disk_dict in self.op.disks:
6255       if disk_op == constants.DDM_REMOVE:
6256         # remove the last disk
6257         device = instance.disks.pop()
6258         device_idx = len(instance.disks)
6259         for node, disk in device.ComputeNodeTree(instance.primary_node):
6260           self.cfg.SetDiskID(disk, node)
6261           msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6262           if msg:
6263             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6264                             " continuing anyway", device_idx, node, msg)
6265         result.append(("disk/%d" % device_idx, "remove"))
6266       elif disk_op == constants.DDM_ADD:
6267         # add a new disk
6268         if instance.disk_template == constants.DT_FILE:
6269           file_driver, file_path = instance.disks[0].logical_id
6270           file_path = os.path.dirname(file_path)
6271         else:
6272           file_driver = file_path = None
6273         disk_idx_base = len(instance.disks)
6274         new_disk = _GenerateDiskTemplate(self,
6275                                          instance.disk_template,
6276                                          instance.name, instance.primary_node,
6277                                          instance.secondary_nodes,
6278                                          [disk_dict],
6279                                          file_path,
6280                                          file_driver,
6281                                          disk_idx_base)[0]
6282         instance.disks.append(new_disk)
6283         info = _GetInstanceInfoText(instance)
6284
6285         logging.info("Creating volume %s for instance %s",
6286                      new_disk.iv_name, instance.name)
6287         # Note: this needs to be kept in sync with _CreateDisks
6288         #HARDCODE
6289         for node in instance.all_nodes:
6290           f_create = node == instance.primary_node
6291           try:
6292             _CreateBlockDev(self, node, instance, new_disk,
6293                             f_create, info, f_create)
6294           except errors.OpExecError, err:
6295             self.LogWarning("Failed to create volume %s (%s) on"
6296                             " node %s: %s",
6297                             new_disk.iv_name, new_disk, node, err)
6298         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6299                        (new_disk.size, new_disk.mode)))
6300       else:
6301         # change a given disk
6302         instance.disks[disk_op].mode = disk_dict['mode']
6303         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6304     # NIC changes
6305     for nic_op, nic_dict in self.op.nics:
6306       if nic_op == constants.DDM_REMOVE:
6307         # remove the last nic
6308         del instance.nics[-1]
6309         result.append(("nic.%d" % len(instance.nics), "remove"))
6310       elif nic_op == constants.DDM_ADD:
6311         # mac and bridge should be set, by now
6312         mac = nic_dict['mac']
6313         ip = nic_dict.get('ip', None)
6314         nicparams = self.nic_pinst[constants.DDM_ADD]
6315         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6316         instance.nics.append(new_nic)
6317         result.append(("nic.%d" % (len(instance.nics) - 1),
6318                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6319                        (new_nic.mac, new_nic.ip,
6320                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6321                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6322                        )))
6323       else:
6324         for key in 'mac', 'ip':
6325           if key in nic_dict:
6326             setattr(instance.nics[nic_op], key, nic_dict[key])
6327         if nic_op in self.nic_pnew:
6328           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6329         for key, val in nic_dict.iteritems():
6330           result.append(("nic.%s/%d" % (key, nic_op), val))
6331
6332     # hvparams changes
6333     if self.op.hvparams:
6334       instance.hvparams = self.hv_inst
6335       for key, val in self.op.hvparams.iteritems():
6336         result.append(("hv/%s" % key, val))
6337
6338     # beparams changes
6339     if self.op.beparams:
6340       instance.beparams = self.be_inst
6341       for key, val in self.op.beparams.iteritems():
6342         result.append(("be/%s" % key, val))
6343
6344     self.cfg.Update(instance)
6345
6346     return result
6347
6348
6349 class LUQueryExports(NoHooksLU):
6350   """Query the exports list
6351
6352   """
6353   _OP_REQP = ['nodes']
6354   REQ_BGL = False
6355
6356   def ExpandNames(self):
6357     self.needed_locks = {}
6358     self.share_locks[locking.LEVEL_NODE] = 1
6359     if not self.op.nodes:
6360       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6361     else:
6362       self.needed_locks[locking.LEVEL_NODE] = \
6363         _GetWantedNodes(self, self.op.nodes)
6364
6365   def CheckPrereq(self):
6366     """Check prerequisites.
6367
6368     """
6369     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6370
6371   def Exec(self, feedback_fn):
6372     """Compute the list of all the exported system images.
6373
6374     @rtype: dict
6375     @return: a dictionary with the structure node->(export-list)
6376         where export-list is a list of the instances exported on
6377         that node.
6378
6379     """
6380     rpcresult = self.rpc.call_export_list(self.nodes)
6381     result = {}
6382     for node in rpcresult:
6383       if rpcresult[node].fail_msg:
6384         result[node] = False
6385       else:
6386         result[node] = rpcresult[node].payload
6387
6388     return result
6389
6390
6391 class LUExportInstance(LogicalUnit):
6392   """Export an instance to an image in the cluster.
6393
6394   """
6395   HPATH = "instance-export"
6396   HTYPE = constants.HTYPE_INSTANCE
6397   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6398   REQ_BGL = False
6399
6400   def ExpandNames(self):
6401     self._ExpandAndLockInstance()
6402     # FIXME: lock only instance primary and destination node
6403     #
6404     # Sad but true, for now we have do lock all nodes, as we don't know where
6405     # the previous export might be, and and in this LU we search for it and
6406     # remove it from its current node. In the future we could fix this by:
6407     #  - making a tasklet to search (share-lock all), then create the new one,
6408     #    then one to remove, after
6409     #  - removing the removal operation altoghether
6410     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6411
6412   def DeclareLocks(self, level):
6413     """Last minute lock declaration."""
6414     # All nodes are locked anyway, so nothing to do here.
6415
6416   def BuildHooksEnv(self):
6417     """Build hooks env.
6418
6419     This will run on the master, primary node and target node.
6420
6421     """
6422     env = {
6423       "EXPORT_NODE": self.op.target_node,
6424       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6425       }
6426     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6427     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6428           self.op.target_node]
6429     return env, nl, nl
6430
6431   def CheckPrereq(self):
6432     """Check prerequisites.
6433
6434     This checks that the instance and node names are valid.
6435
6436     """
6437     instance_name = self.op.instance_name
6438     self.instance = self.cfg.GetInstanceInfo(instance_name)
6439     assert self.instance is not None, \
6440           "Cannot retrieve locked instance %s" % self.op.instance_name
6441     _CheckNodeOnline(self, self.instance.primary_node)
6442
6443     self.dst_node = self.cfg.GetNodeInfo(
6444       self.cfg.ExpandNodeName(self.op.target_node))
6445
6446     if self.dst_node is None:
6447       # This is wrong node name, not a non-locked node
6448       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6449     _CheckNodeOnline(self, self.dst_node.name)
6450     _CheckNodeNotDrained(self, self.dst_node.name)
6451
6452     # instance disk type verification
6453     for disk in self.instance.disks:
6454       if disk.dev_type == constants.LD_FILE:
6455         raise errors.OpPrereqError("Export not supported for instances with"
6456                                    " file-based disks")
6457
6458   def Exec(self, feedback_fn):
6459     """Export an instance to an image in the cluster.
6460
6461     """
6462     instance = self.instance
6463     dst_node = self.dst_node
6464     src_node = instance.primary_node
6465     if self.op.shutdown:
6466       # shutdown the instance, but not the disks
6467       result = self.rpc.call_instance_shutdown(src_node, instance)
6468       result.Raise("Could not shutdown instance %s on"
6469                    " node %s" % (instance.name, src_node))
6470
6471     vgname = self.cfg.GetVGName()
6472
6473     snap_disks = []
6474
6475     # set the disks ID correctly since call_instance_start needs the
6476     # correct drbd minor to create the symlinks
6477     for disk in instance.disks:
6478       self.cfg.SetDiskID(disk, src_node)
6479
6480     try:
6481       for idx, disk in enumerate(instance.disks):
6482         # result.payload will be a snapshot of an lvm leaf of the one we passed
6483         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6484         msg = result.fail_msg
6485         if msg:
6486           self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6487                           idx, src_node, msg)
6488           snap_disks.append(False)
6489         else:
6490           disk_id = (vgname, result.payload)
6491           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6492                                  logical_id=disk_id, physical_id=disk_id,
6493                                  iv_name=disk.iv_name)
6494           snap_disks.append(new_dev)
6495
6496     finally:
6497       if self.op.shutdown and instance.admin_up:
6498         result = self.rpc.call_instance_start(src_node, instance, None, None)
6499         msg = result.fail_msg
6500         if msg:
6501           _ShutdownInstanceDisks(self, instance)
6502           raise errors.OpExecError("Could not start instance: %s" % msg)
6503
6504     # TODO: check for size
6505
6506     cluster_name = self.cfg.GetClusterName()
6507     for idx, dev in enumerate(snap_disks):
6508       if dev:
6509         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6510                                                instance, cluster_name, idx)
6511         msg = result.fail_msg
6512         if msg:
6513           self.LogWarning("Could not export disk/%s from node %s to"
6514                           " node %s: %s", idx, src_node, dst_node.name, msg)
6515         msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6516         if msg:
6517           self.LogWarning("Could not remove snapshot for disk/%d from node"
6518                           " %s: %s", idx, src_node, msg)
6519
6520     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6521     msg = result.fail_msg
6522     if msg:
6523       self.LogWarning("Could not finalize export for instance %s"
6524                       " on node %s: %s", instance.name, dst_node.name, msg)
6525
6526     nodelist = self.cfg.GetNodeList()
6527     nodelist.remove(dst_node.name)
6528
6529     # on one-node clusters nodelist will be empty after the removal
6530     # if we proceed the backup would be removed because OpQueryExports
6531     # substitutes an empty list with the full cluster node list.
6532     iname = instance.name
6533     if nodelist:
6534       exportlist = self.rpc.call_export_list(nodelist)
6535       for node in exportlist:
6536         if exportlist[node].fail_msg:
6537           continue
6538         if iname in exportlist[node].payload:
6539           msg = self.rpc.call_export_remove(node, iname).fail_msg
6540           if msg:
6541             self.LogWarning("Could not remove older export for instance %s"
6542                             " on node %s: %s", iname, node, msg)
6543
6544
6545 class LURemoveExport(NoHooksLU):
6546   """Remove exports related to the named instance.
6547
6548   """
6549   _OP_REQP = ["instance_name"]
6550   REQ_BGL = False
6551
6552   def ExpandNames(self):
6553     self.needed_locks = {}
6554     # We need all nodes to be locked in order for RemoveExport to work, but we
6555     # don't need to lock the instance itself, as nothing will happen to it (and
6556     # we can remove exports also for a removed instance)
6557     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6558
6559   def CheckPrereq(self):
6560     """Check prerequisites.
6561     """
6562     pass
6563
6564   def Exec(self, feedback_fn):
6565     """Remove any export.
6566
6567     """
6568     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6569     # If the instance was not found we'll try with the name that was passed in.
6570     # This will only work if it was an FQDN, though.
6571     fqdn_warn = False
6572     if not instance_name:
6573       fqdn_warn = True
6574       instance_name = self.op.instance_name
6575
6576     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6577     exportlist = self.rpc.call_export_list(locked_nodes)
6578     found = False
6579     for node in exportlist:
6580       msg = exportlist[node].fail_msg
6581       if msg:
6582         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6583         continue
6584       if instance_name in exportlist[node].payload:
6585         found = True
6586         result = self.rpc.call_export_remove(node, instance_name)
6587         msg = result.fail_msg
6588         if msg:
6589           logging.error("Could not remove export for instance %s"
6590                         " on node %s: %s", instance_name, node, msg)
6591
6592     if fqdn_warn and not found:
6593       feedback_fn("Export not found. If trying to remove an export belonging"
6594                   " to a deleted instance please use its Fully Qualified"
6595                   " Domain Name.")
6596
6597
6598 class TagsLU(NoHooksLU):
6599   """Generic tags LU.
6600
6601   This is an abstract class which is the parent of all the other tags LUs.
6602
6603   """
6604
6605   def ExpandNames(self):
6606     self.needed_locks = {}
6607     if self.op.kind == constants.TAG_NODE:
6608       name = self.cfg.ExpandNodeName(self.op.name)
6609       if name is None:
6610         raise errors.OpPrereqError("Invalid node name (%s)" %
6611                                    (self.op.name,))
6612       self.op.name = name
6613       self.needed_locks[locking.LEVEL_NODE] = name
6614     elif self.op.kind == constants.TAG_INSTANCE:
6615       name = self.cfg.ExpandInstanceName(self.op.name)
6616       if name is None:
6617         raise errors.OpPrereqError("Invalid instance name (%s)" %
6618                                    (self.op.name,))
6619       self.op.name = name
6620       self.needed_locks[locking.LEVEL_INSTANCE] = name
6621
6622   def CheckPrereq(self):
6623     """Check prerequisites.
6624
6625     """
6626     if self.op.kind == constants.TAG_CLUSTER:
6627       self.target = self.cfg.GetClusterInfo()
6628     elif self.op.kind == constants.TAG_NODE:
6629       self.target = self.cfg.GetNodeInfo(self.op.name)
6630     elif self.op.kind == constants.TAG_INSTANCE:
6631       self.target = self.cfg.GetInstanceInfo(self.op.name)
6632     else:
6633       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6634                                  str(self.op.kind))
6635
6636
6637 class LUGetTags(TagsLU):
6638   """Returns the tags of a given object.
6639
6640   """
6641   _OP_REQP = ["kind", "name"]
6642   REQ_BGL = False
6643
6644   def Exec(self, feedback_fn):
6645     """Returns the tag list.
6646
6647     """
6648     return list(self.target.GetTags())
6649
6650
6651 class LUSearchTags(NoHooksLU):
6652   """Searches the tags for a given pattern.
6653
6654   """
6655   _OP_REQP = ["pattern"]
6656   REQ_BGL = False
6657
6658   def ExpandNames(self):
6659     self.needed_locks = {}
6660
6661   def CheckPrereq(self):
6662     """Check prerequisites.
6663
6664     This checks the pattern passed for validity by compiling it.
6665
6666     """
6667     try:
6668       self.re = re.compile(self.op.pattern)
6669     except re.error, err:
6670       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6671                                  (self.op.pattern, err))
6672
6673   def Exec(self, feedback_fn):
6674     """Returns the tag list.
6675
6676     """
6677     cfg = self.cfg
6678     tgts = [("/cluster", cfg.GetClusterInfo())]
6679     ilist = cfg.GetAllInstancesInfo().values()
6680     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6681     nlist = cfg.GetAllNodesInfo().values()
6682     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6683     results = []
6684     for path, target in tgts:
6685       for tag in target.GetTags():
6686         if self.re.search(tag):
6687           results.append((path, tag))
6688     return results
6689
6690
6691 class LUAddTags(TagsLU):
6692   """Sets a tag on a given object.
6693
6694   """
6695   _OP_REQP = ["kind", "name", "tags"]
6696   REQ_BGL = False
6697
6698   def CheckPrereq(self):
6699     """Check prerequisites.
6700
6701     This checks the type and length of the tag name and value.
6702
6703     """
6704     TagsLU.CheckPrereq(self)
6705     for tag in self.op.tags:
6706       objects.TaggableObject.ValidateTag(tag)
6707
6708   def Exec(self, feedback_fn):
6709     """Sets the tag.
6710
6711     """
6712     try:
6713       for tag in self.op.tags:
6714         self.target.AddTag(tag)
6715     except errors.TagError, err:
6716       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6717     try:
6718       self.cfg.Update(self.target)
6719     except errors.ConfigurationError:
6720       raise errors.OpRetryError("There has been a modification to the"
6721                                 " config file and the operation has been"
6722                                 " aborted. Please retry.")
6723
6724
6725 class LUDelTags(TagsLU):
6726   """Delete a list of tags from a given object.
6727
6728   """
6729   _OP_REQP = ["kind", "name", "tags"]
6730   REQ_BGL = False
6731
6732   def CheckPrereq(self):
6733     """Check prerequisites.
6734
6735     This checks that we have the given tag.
6736
6737     """
6738     TagsLU.CheckPrereq(self)
6739     for tag in self.op.tags:
6740       objects.TaggableObject.ValidateTag(tag)
6741     del_tags = frozenset(self.op.tags)
6742     cur_tags = self.target.GetTags()
6743     if not del_tags <= cur_tags:
6744       diff_tags = del_tags - cur_tags
6745       diff_names = ["'%s'" % tag for tag in diff_tags]
6746       diff_names.sort()
6747       raise errors.OpPrereqError("Tag(s) %s not found" %
6748                                  (",".join(diff_names)))
6749
6750   def Exec(self, feedback_fn):
6751     """Remove the tag from the object.
6752
6753     """
6754     for tag in self.op.tags:
6755       self.target.RemoveTag(tag)
6756     try:
6757       self.cfg.Update(self.target)
6758     except errors.ConfigurationError:
6759       raise errors.OpRetryError("There has been a modification to the"
6760                                 " config file and the operation has been"
6761                                 " aborted. Please retry.")
6762
6763
6764 class LUTestDelay(NoHooksLU):
6765   """Sleep for a specified amount of time.
6766
6767   This LU sleeps on the master and/or nodes for a specified amount of
6768   time.
6769
6770   """
6771   _OP_REQP = ["duration", "on_master", "on_nodes"]
6772   REQ_BGL = False
6773
6774   def ExpandNames(self):
6775     """Expand names and set required locks.
6776
6777     This expands the node list, if any.
6778
6779     """
6780     self.needed_locks = {}
6781     if self.op.on_nodes:
6782       # _GetWantedNodes can be used here, but is not always appropriate to use
6783       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6784       # more information.
6785       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6786       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6787
6788   def CheckPrereq(self):
6789     """Check prerequisites.
6790
6791     """
6792
6793   def Exec(self, feedback_fn):
6794     """Do the actual sleep.
6795
6796     """
6797     if self.op.on_master:
6798       if not utils.TestDelay(self.op.duration):
6799         raise errors.OpExecError("Error during master delay test")
6800     if self.op.on_nodes:
6801       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6802       for node, node_result in result.items():
6803         node_result.Raise("Failure during rpc call to node %s" % node)
6804
6805
6806 class IAllocator(object):
6807   """IAllocator framework.
6808
6809   An IAllocator instance has three sets of attributes:
6810     - cfg that is needed to query the cluster
6811     - input data (all members of the _KEYS class attribute are required)
6812     - four buffer attributes (in|out_data|text), that represent the
6813       input (to the external script) in text and data structure format,
6814       and the output from it, again in two formats
6815     - the result variables from the script (success, info, nodes) for
6816       easy usage
6817
6818   """
6819   _ALLO_KEYS = [
6820     "mem_size", "disks", "disk_template",
6821     "os", "tags", "nics", "vcpus", "hypervisor",
6822     ]
6823   _RELO_KEYS = [
6824     "relocate_from",
6825     ]
6826
6827   def __init__(self, lu, mode, name, **kwargs):
6828     self.lu = lu
6829     # init buffer variables
6830     self.in_text = self.out_text = self.in_data = self.out_data = None
6831     # init all input fields so that pylint is happy
6832     self.mode = mode
6833     self.name = name
6834     self.mem_size = self.disks = self.disk_template = None
6835     self.os = self.tags = self.nics = self.vcpus = None
6836     self.hypervisor = None
6837     self.relocate_from = None
6838     # computed fields
6839     self.required_nodes = None
6840     # init result fields
6841     self.success = self.info = self.nodes = None
6842     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6843       keyset = self._ALLO_KEYS
6844     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6845       keyset = self._RELO_KEYS
6846     else:
6847       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6848                                    " IAllocator" % self.mode)
6849     for key in kwargs:
6850       if key not in keyset:
6851         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6852                                      " IAllocator" % key)
6853       setattr(self, key, kwargs[key])
6854     for key in keyset:
6855       if key not in kwargs:
6856         raise errors.ProgrammerError("Missing input parameter '%s' to"
6857                                      " IAllocator" % key)
6858     self._BuildInputData()
6859
6860   def _ComputeClusterData(self):
6861     """Compute the generic allocator input data.
6862
6863     This is the data that is independent of the actual operation.
6864
6865     """
6866     cfg = self.lu.cfg
6867     cluster_info = cfg.GetClusterInfo()
6868     # cluster data
6869     data = {
6870       "version": constants.IALLOCATOR_VERSION,
6871       "cluster_name": cfg.GetClusterName(),
6872       "cluster_tags": list(cluster_info.GetTags()),
6873       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6874       # we don't have job IDs
6875       }
6876     iinfo = cfg.GetAllInstancesInfo().values()
6877     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6878
6879     # node data
6880     node_results = {}
6881     node_list = cfg.GetNodeList()
6882
6883     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6884       hypervisor_name = self.hypervisor
6885     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6886       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6887
6888     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6889                                            hypervisor_name)
6890     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6891                        cluster_info.enabled_hypervisors)
6892     for nname, nresult in node_data.items():
6893       # first fill in static (config-based) values
6894       ninfo = cfg.GetNodeInfo(nname)
6895       pnr = {
6896         "tags": list(ninfo.GetTags()),
6897         "primary_ip": ninfo.primary_ip,
6898         "secondary_ip": ninfo.secondary_ip,
6899         "offline": ninfo.offline,
6900         "drained": ninfo.drained,
6901         "master_candidate": ninfo.master_candidate,
6902         }
6903
6904       if not ninfo.offline:
6905         nresult.Raise("Can't get data for node %s" % nname)
6906         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6907                                 nname)
6908         remote_info = nresult.payload
6909         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6910                      'vg_size', 'vg_free', 'cpu_total']:
6911           if attr not in remote_info:
6912             raise errors.OpExecError("Node '%s' didn't return attribute"
6913                                      " '%s'" % (nname, attr))
6914           if not isinstance(remote_info[attr], int):
6915             raise errors.OpExecError("Node '%s' returned invalid value"
6916                                      " for '%s': %s" %
6917                                      (nname, attr, remote_info[attr]))
6918         # compute memory used by primary instances
6919         i_p_mem = i_p_up_mem = 0
6920         for iinfo, beinfo in i_list:
6921           if iinfo.primary_node == nname:
6922             i_p_mem += beinfo[constants.BE_MEMORY]
6923             if iinfo.name not in node_iinfo[nname].payload:
6924               i_used_mem = 0
6925             else:
6926               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6927             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6928             remote_info['memory_free'] -= max(0, i_mem_diff)
6929
6930             if iinfo.admin_up:
6931               i_p_up_mem += beinfo[constants.BE_MEMORY]
6932
6933         # compute memory used by instances
6934         pnr_dyn = {
6935           "total_memory": remote_info['memory_total'],
6936           "reserved_memory": remote_info['memory_dom0'],
6937           "free_memory": remote_info['memory_free'],
6938           "total_disk": remote_info['vg_size'],
6939           "free_disk": remote_info['vg_free'],
6940           "total_cpus": remote_info['cpu_total'],
6941           "i_pri_memory": i_p_mem,
6942           "i_pri_up_memory": i_p_up_mem,
6943           }
6944         pnr.update(pnr_dyn)
6945
6946       node_results[nname] = pnr
6947     data["nodes"] = node_results
6948
6949     # instance data
6950     instance_data = {}
6951     for iinfo, beinfo in i_list:
6952       nic_data = []
6953       for nic in iinfo.nics:
6954         filled_params = objects.FillDict(
6955             cluster_info.nicparams[constants.PP_DEFAULT],
6956             nic.nicparams)
6957         nic_dict = {"mac": nic.mac,
6958                     "ip": nic.ip,
6959                     "mode": filled_params[constants.NIC_MODE],
6960                     "link": filled_params[constants.NIC_LINK],
6961                    }
6962         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6963           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6964         nic_data.append(nic_dict)
6965       pir = {
6966         "tags": list(iinfo.GetTags()),
6967         "admin_up": iinfo.admin_up,
6968         "vcpus": beinfo[constants.BE_VCPUS],
6969         "memory": beinfo[constants.BE_MEMORY],
6970         "os": iinfo.os,
6971         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6972         "nics": nic_data,
6973         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6974         "disk_template": iinfo.disk_template,
6975         "hypervisor": iinfo.hypervisor,
6976         }
6977       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6978                                                  pir["disks"])
6979       instance_data[iinfo.name] = pir
6980
6981     data["instances"] = instance_data
6982
6983     self.in_data = data
6984
6985   def _AddNewInstance(self):
6986     """Add new instance data to allocator structure.
6987
6988     This in combination with _AllocatorGetClusterData will create the
6989     correct structure needed as input for the allocator.
6990
6991     The checks for the completeness of the opcode must have already been
6992     done.
6993
6994     """
6995     data = self.in_data
6996
6997     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6998
6999     if self.disk_template in constants.DTS_NET_MIRROR:
7000       self.required_nodes = 2
7001     else:
7002       self.required_nodes = 1
7003     request = {
7004       "type": "allocate",
7005       "name": self.name,
7006       "disk_template": self.disk_template,
7007       "tags": self.tags,
7008       "os": self.os,
7009       "vcpus": self.vcpus,
7010       "memory": self.mem_size,
7011       "disks": self.disks,
7012       "disk_space_total": disk_space,
7013       "nics": self.nics,
7014       "required_nodes": self.required_nodes,
7015       }
7016     data["request"] = request
7017
7018   def _AddRelocateInstance(self):
7019     """Add relocate instance data to allocator structure.
7020
7021     This in combination with _IAllocatorGetClusterData will create the
7022     correct structure needed as input for the allocator.
7023
7024     The checks for the completeness of the opcode must have already been
7025     done.
7026
7027     """
7028     instance = self.lu.cfg.GetInstanceInfo(self.name)
7029     if instance is None:
7030       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7031                                    " IAllocator" % self.name)
7032
7033     if instance.disk_template not in constants.DTS_NET_MIRROR:
7034       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7035
7036     if len(instance.secondary_nodes) != 1:
7037       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7038
7039     self.required_nodes = 1
7040     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7041     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7042
7043     request = {
7044       "type": "relocate",
7045       "name": self.name,
7046       "disk_space_total": disk_space,
7047       "required_nodes": self.required_nodes,
7048       "relocate_from": self.relocate_from,
7049       }
7050     self.in_data["request"] = request
7051
7052   def _BuildInputData(self):
7053     """Build input data structures.
7054
7055     """
7056     self._ComputeClusterData()
7057
7058     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7059       self._AddNewInstance()
7060     else:
7061       self._AddRelocateInstance()
7062
7063     self.in_text = serializer.Dump(self.in_data)
7064
7065   def Run(self, name, validate=True, call_fn=None):
7066     """Run an instance allocator and return the results.
7067
7068     """
7069     if call_fn is None:
7070       call_fn = self.lu.rpc.call_iallocator_runner
7071     data = self.in_text
7072
7073     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7074     result.Raise("Failure while running the iallocator script")
7075
7076     self.out_text = result.payload
7077     if validate:
7078       self._ValidateResult()
7079
7080   def _ValidateResult(self):
7081     """Process the allocator results.
7082
7083     This will process and if successful save the result in
7084     self.out_data and the other parameters.
7085
7086     """
7087     try:
7088       rdict = serializer.Load(self.out_text)
7089     except Exception, err:
7090       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7091
7092     if not isinstance(rdict, dict):
7093       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7094
7095     for key in "success", "info", "nodes":
7096       if key not in rdict:
7097         raise errors.OpExecError("Can't parse iallocator results:"
7098                                  " missing key '%s'" % key)
7099       setattr(self, key, rdict[key])
7100
7101     if not isinstance(rdict["nodes"], list):
7102       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7103                                " is not a list")
7104     self.out_data = rdict
7105
7106
7107 class LUTestAllocator(NoHooksLU):
7108   """Run allocator tests.
7109
7110   This LU runs the allocator tests
7111
7112   """
7113   _OP_REQP = ["direction", "mode", "name"]
7114
7115   def CheckPrereq(self):
7116     """Check prerequisites.
7117
7118     This checks the opcode parameters depending on the director and mode test.
7119
7120     """
7121     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7122       for attr in ["name", "mem_size", "disks", "disk_template",
7123                    "os", "tags", "nics", "vcpus"]:
7124         if not hasattr(self.op, attr):
7125           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7126                                      attr)
7127       iname = self.cfg.ExpandInstanceName(self.op.name)
7128       if iname is not None:
7129         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7130                                    iname)
7131       if not isinstance(self.op.nics, list):
7132         raise errors.OpPrereqError("Invalid parameter 'nics'")
7133       for row in self.op.nics:
7134         if (not isinstance(row, dict) or
7135             "mac" not in row or
7136             "ip" not in row or
7137             "bridge" not in row):
7138           raise errors.OpPrereqError("Invalid contents of the"
7139                                      " 'nics' parameter")
7140       if not isinstance(self.op.disks, list):
7141         raise errors.OpPrereqError("Invalid parameter 'disks'")
7142       for row in self.op.disks:
7143         if (not isinstance(row, dict) or
7144             "size" not in row or
7145             not isinstance(row["size"], int) or
7146             "mode" not in row or
7147             row["mode"] not in ['r', 'w']):
7148           raise errors.OpPrereqError("Invalid contents of the"
7149                                      " 'disks' parameter")
7150       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7151         self.op.hypervisor = self.cfg.GetHypervisorType()
7152     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7153       if not hasattr(self.op, "name"):
7154         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7155       fname = self.cfg.ExpandInstanceName(self.op.name)
7156       if fname is None:
7157         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7158                                    self.op.name)
7159       self.op.name = fname
7160       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7161     else:
7162       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7163                                  self.op.mode)
7164
7165     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7166       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7167         raise errors.OpPrereqError("Missing allocator name")
7168     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7169       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7170                                  self.op.direction)
7171
7172   def Exec(self, feedback_fn):
7173     """Run the allocator test.
7174
7175     """
7176     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7177       ial = IAllocator(self,
7178                        mode=self.op.mode,
7179                        name=self.op.name,
7180                        mem_size=self.op.mem_size,
7181                        disks=self.op.disks,
7182                        disk_template=self.op.disk_template,
7183                        os=self.op.os,
7184                        tags=self.op.tags,
7185                        nics=self.op.nics,
7186                        vcpus=self.op.vcpus,
7187                        hypervisor=self.op.hypervisor,
7188                        )
7189     else:
7190       ial = IAllocator(self,
7191                        mode=self.op.mode,
7192                        name=self.op.name,
7193                        relocate_from=list(self.relocate_from),
7194                        )
7195
7196     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7197       result = ial.in_text
7198     else:
7199       ial.Run(self.op.allocator, validate=False)
7200       result = ial.out_text
7201     return result