Rename the volume_list RPC call to lv_list
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import re
30 import platform
31 import logging
32 import copy
33
34 from ganeti import ssh
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import serializer
42 from ganeti import ssconf
43
44
45 class LogicalUnit(object):
46   """Logical Unit base class.
47
48   Subclasses must follow these rules:
49     - implement ExpandNames
50     - implement CheckPrereq
51     - implement Exec
52     - implement BuildHooksEnv
53     - redefine HPATH and HTYPE
54     - optionally redefine their run requirements:
55         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
56
57   Note that all commands require root permissions.
58
59   @ivar dry_run_result: the value (if any) that will be returned to the caller
60       in dry-run mode (signalled by opcode dry_run parameter)
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92     # support for dry-run
93     self.dry_run_result = None
94
95     for attr_name in self._OP_REQP:
96       attr_val = getattr(op, attr_name, None)
97       if attr_val is None:
98         raise errors.OpPrereqError("Required parameter '%s' missing" %
99                                    attr_name)
100     self.CheckArguments()
101
102   def __GetSSH(self):
103     """Returns the SshRunner object
104
105     """
106     if not self.__ssh:
107       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
108     return self.__ssh
109
110   ssh = property(fget=__GetSSH)
111
112   def CheckArguments(self):
113     """Check syntactic validity for the opcode arguments.
114
115     This method is for doing a simple syntactic check and ensure
116     validity of opcode parameters, without any cluster-related
117     checks. While the same can be accomplished in ExpandNames and/or
118     CheckPrereq, doing these separate is better because:
119
120       - ExpandNames is left as as purely a lock-related function
121       - CheckPrereq is run after we have aquired locks (and possible
122         waited for them)
123
124     The function is allowed to change the self.op attribute so that
125     later methods can no longer worry about missing parameters.
126
127     """
128     pass
129
130   def ExpandNames(self):
131     """Expand names for this LU.
132
133     This method is called before starting to execute the opcode, and it should
134     update all the parameters of the opcode to their canonical form (e.g. a
135     short node name must be fully expanded after this method has successfully
136     completed). This way locking, hooks, logging, ecc. can work correctly.
137
138     LUs which implement this method must also populate the self.needed_locks
139     member, as a dict with lock levels as keys, and a list of needed lock names
140     as values. Rules:
141
142       - use an empty dict if you don't need any lock
143       - if you don't need any lock at a particular level omit that level
144       - don't put anything for the BGL level
145       - if you want all locks at a level use locking.ALL_SET as a value
146
147     If you need to share locks (rather than acquire them exclusively) at one
148     level you can modify self.share_locks, setting a true value (usually 1) for
149     that level. By default locks are not shared.
150
151     Examples::
152
153       # Acquire all nodes and one instance
154       self.needed_locks = {
155         locking.LEVEL_NODE: locking.ALL_SET,
156         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
157       }
158       # Acquire just two nodes
159       self.needed_locks = {
160         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
161       }
162       # Acquire no locks
163       self.needed_locks = {} # No, you can't leave it to the default value None
164
165     """
166     # The implementation of this method is mandatory only if the new LU is
167     # concurrent, so that old LUs don't need to be changed all at the same
168     # time.
169     if self.REQ_BGL:
170       self.needed_locks = {} # Exclusive LUs don't need locks.
171     else:
172       raise NotImplementedError
173
174   def DeclareLocks(self, level):
175     """Declare LU locking needs for a level
176
177     While most LUs can just declare their locking needs at ExpandNames time,
178     sometimes there's the need to calculate some locks after having acquired
179     the ones before. This function is called just before acquiring locks at a
180     particular level, but after acquiring the ones at lower levels, and permits
181     such calculations. It can be used to modify self.needed_locks, and by
182     default it does nothing.
183
184     This function is only called if you have something already set in
185     self.needed_locks for the level.
186
187     @param level: Locking level which is going to be locked
188     @type level: member of ganeti.locking.LEVELS
189
190     """
191
192   def CheckPrereq(self):
193     """Check prerequisites for this LU.
194
195     This method should check that the prerequisites for the execution
196     of this LU are fulfilled. It can do internode communication, but
197     it should be idempotent - no cluster or system changes are
198     allowed.
199
200     The method should raise errors.OpPrereqError in case something is
201     not fulfilled. Its return value is ignored.
202
203     This method should also update all the parameters of the opcode to
204     their canonical form if it hasn't been done by ExpandNames before.
205
206     """
207     raise NotImplementedError
208
209   def Exec(self, feedback_fn):
210     """Execute the LU.
211
212     This method should implement the actual work. It should raise
213     errors.OpExecError for failures that are somewhat dealt with in
214     code, or expected.
215
216     """
217     raise NotImplementedError
218
219   def BuildHooksEnv(self):
220     """Build hooks environment for this LU.
221
222     This method should return a three-node tuple consisting of: a dict
223     containing the environment that will be used for running the
224     specific hook for this LU, a list of node names on which the hook
225     should run before the execution, and a list of node names on which
226     the hook should run after the execution.
227
228     The keys of the dict must not have 'GANETI_' prefixed as this will
229     be handled in the hooks runner. Also note additional keys will be
230     added by the hooks runner. If the LU doesn't define any
231     environment, an empty dict (and not None) should be returned.
232
233     No nodes should be returned as an empty list (and not None).
234
235     Note that if the HPATH for a LU class is None, this function will
236     not be called.
237
238     """
239     raise NotImplementedError
240
241   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
242     """Notify the LU about the results of its hooks.
243
244     This method is called every time a hooks phase is executed, and notifies
245     the Logical Unit about the hooks' result. The LU can then use it to alter
246     its result based on the hooks.  By default the method does nothing and the
247     previous result is passed back unchanged but any LU can define it if it
248     wants to use the local cluster hook-scripts somehow.
249
250     @param phase: one of L{constants.HOOKS_PHASE_POST} or
251         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
252     @param hook_results: the results of the multi-node hooks rpc call
253     @param feedback_fn: function used send feedback back to the caller
254     @param lu_result: the previous Exec result this LU had, or None
255         in the PRE phase
256     @return: the new Exec result, based on the previous result
257         and hook results
258
259     """
260     return lu_result
261
262   def _ExpandAndLockInstance(self):
263     """Helper function to expand and lock an instance.
264
265     Many LUs that work on an instance take its name in self.op.instance_name
266     and need to expand it and then declare the expanded name for locking. This
267     function does it, and then updates self.op.instance_name to the expanded
268     name. It also initializes needed_locks as a dict, if this hasn't been done
269     before.
270
271     """
272     if self.needed_locks is None:
273       self.needed_locks = {}
274     else:
275       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
276         "_ExpandAndLockInstance called with instance-level locks set"
277     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
278     if expanded_name is None:
279       raise errors.OpPrereqError("Instance '%s' not known" %
280                                   self.op.instance_name)
281     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
282     self.op.instance_name = expanded_name
283
284   def _LockInstancesNodes(self, primary_only=False):
285     """Helper function to declare instances' nodes for locking.
286
287     This function should be called after locking one or more instances to lock
288     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
289     with all primary or secondary nodes for instances already locked and
290     present in self.needed_locks[locking.LEVEL_INSTANCE].
291
292     It should be called from DeclareLocks, and for safety only works if
293     self.recalculate_locks[locking.LEVEL_NODE] is set.
294
295     In the future it may grow parameters to just lock some instance's nodes, or
296     to just lock primaries or secondary nodes, if needed.
297
298     If should be called in DeclareLocks in a way similar to::
299
300       if level == locking.LEVEL_NODE:
301         self._LockInstancesNodes()
302
303     @type primary_only: boolean
304     @param primary_only: only lock primary nodes of locked instances
305
306     """
307     assert locking.LEVEL_NODE in self.recalculate_locks, \
308       "_LockInstancesNodes helper function called with no nodes to recalculate"
309
310     # TODO: check if we're really been called with the instance locks held
311
312     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
313     # future we might want to have different behaviors depending on the value
314     # of self.recalculate_locks[locking.LEVEL_NODE]
315     wanted_nodes = []
316     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
317       instance = self.context.cfg.GetInstanceInfo(instance_name)
318       wanted_nodes.append(instance.primary_node)
319       if not primary_only:
320         wanted_nodes.extend(instance.secondary_nodes)
321
322     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
323       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
324     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
325       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
326
327     del self.recalculate_locks[locking.LEVEL_NODE]
328
329
330 class NoHooksLU(LogicalUnit):
331   """Simple LU which runs no hooks.
332
333   This LU is intended as a parent for other LogicalUnits which will
334   run no hooks, in order to reduce duplicate code.
335
336   """
337   HPATH = None
338   HTYPE = None
339
340
341 def _GetWantedNodes(lu, nodes):
342   """Returns list of checked and expanded node names.
343
344   @type lu: L{LogicalUnit}
345   @param lu: the logical unit on whose behalf we execute
346   @type nodes: list
347   @param nodes: list of node names or None for all nodes
348   @rtype: list
349   @return: the list of nodes, sorted
350   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
351
352   """
353   if not isinstance(nodes, list):
354     raise errors.OpPrereqError("Invalid argument type 'nodes'")
355
356   if not nodes:
357     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
358       " non-empty list of nodes whose name is to be expanded.")
359
360   wanted = []
361   for name in nodes:
362     node = lu.cfg.ExpandNodeName(name)
363     if node is None:
364       raise errors.OpPrereqError("No such node name '%s'" % name)
365     wanted.append(node)
366
367   return utils.NiceSort(wanted)
368
369
370 def _GetWantedInstances(lu, instances):
371   """Returns list of checked and expanded instance names.
372
373   @type lu: L{LogicalUnit}
374   @param lu: the logical unit on whose behalf we execute
375   @type instances: list
376   @param instances: list of instance names or None for all instances
377   @rtype: list
378   @return: the list of instances, sorted
379   @raise errors.OpPrereqError: if the instances parameter is wrong type
380   @raise errors.OpPrereqError: if any of the passed instances is not found
381
382   """
383   if not isinstance(instances, list):
384     raise errors.OpPrereqError("Invalid argument type 'instances'")
385
386   if instances:
387     wanted = []
388
389     for name in instances:
390       instance = lu.cfg.ExpandInstanceName(name)
391       if instance is None:
392         raise errors.OpPrereqError("No such instance name '%s'" % name)
393       wanted.append(instance)
394
395   else:
396     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
397   return wanted
398
399
400 def _CheckOutputFields(static, dynamic, selected):
401   """Checks whether all selected fields are valid.
402
403   @type static: L{utils.FieldSet}
404   @param static: static fields set
405   @type dynamic: L{utils.FieldSet}
406   @param dynamic: dynamic fields set
407
408   """
409   f = utils.FieldSet()
410   f.Extend(static)
411   f.Extend(dynamic)
412
413   delta = f.NonMatching(selected)
414   if delta:
415     raise errors.OpPrereqError("Unknown output fields selected: %s"
416                                % ",".join(delta))
417
418
419 def _CheckBooleanOpField(op, name):
420   """Validates boolean opcode parameters.
421
422   This will ensure that an opcode parameter is either a boolean value,
423   or None (but that it always exists).
424
425   """
426   val = getattr(op, name, None)
427   if not (val is None or isinstance(val, bool)):
428     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
429                                (name, str(val)))
430   setattr(op, name, val)
431
432
433 def _CheckNodeOnline(lu, node):
434   """Ensure that a given node is online.
435
436   @param lu: the LU on behalf of which we make the check
437   @param node: the node to check
438   @raise errors.OpPrereqError: if the node is offline
439
440   """
441   if lu.cfg.GetNodeInfo(node).offline:
442     raise errors.OpPrereqError("Can't use offline node %s" % node)
443
444
445 def _CheckNodeNotDrained(lu, node):
446   """Ensure that a given node is not drained.
447
448   @param lu: the LU on behalf of which we make the check
449   @param node: the node to check
450   @raise errors.OpPrereqError: if the node is drained
451
452   """
453   if lu.cfg.GetNodeInfo(node).drained:
454     raise errors.OpPrereqError("Can't use drained node %s" % node)
455
456
457 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
458                           memory, vcpus, nics, disk_template, disks,
459                           bep, hvp, hypervisor):
460   """Builds instance related env variables for hooks
461
462   This builds the hook environment from individual variables.
463
464   @type name: string
465   @param name: the name of the instance
466   @type primary_node: string
467   @param primary_node: the name of the instance's primary node
468   @type secondary_nodes: list
469   @param secondary_nodes: list of secondary nodes as strings
470   @type os_type: string
471   @param os_type: the name of the instance's OS
472   @type status: boolean
473   @param status: the should_run status of the instance
474   @type memory: string
475   @param memory: the memory size of the instance
476   @type vcpus: string
477   @param vcpus: the count of VCPUs the instance has
478   @type nics: list
479   @param nics: list of tuples (ip, mac, mode, link) representing
480       the NICs the instance has
481   @type disk_template: string
482   @param disk_template: the distk template of the instance
483   @type disks: list
484   @param disks: the list of (size, mode) pairs
485   @type bep: dict
486   @param bep: the backend parameters for the instance
487   @type hvp: dict
488   @param hvp: the hypervisor parameters for the instance
489   @type hypervisor: string
490   @param hypervisor: the hypervisor for the instance
491   @rtype: dict
492   @return: the hook environment for this instance
493
494   """
495   if status:
496     str_status = "up"
497   else:
498     str_status = "down"
499   env = {
500     "OP_TARGET": name,
501     "INSTANCE_NAME": name,
502     "INSTANCE_PRIMARY": primary_node,
503     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
504     "INSTANCE_OS_TYPE": os_type,
505     "INSTANCE_STATUS": str_status,
506     "INSTANCE_MEMORY": memory,
507     "INSTANCE_VCPUS": vcpus,
508     "INSTANCE_DISK_TEMPLATE": disk_template,
509     "INSTANCE_HYPERVISOR": hypervisor,
510   }
511
512   if nics:
513     nic_count = len(nics)
514     for idx, (ip, mac, mode, link) in enumerate(nics):
515       if ip is None:
516         ip = ""
517       env["INSTANCE_NIC%d_IP" % idx] = ip
518       env["INSTANCE_NIC%d_MAC" % idx] = mac
519       env["INSTANCE_NIC%d_MODE" % idx] = mode
520       env["INSTANCE_NIC%d_LINK" % idx] = link
521       if mode == constants.NIC_MODE_BRIDGED:
522         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
523   else:
524     nic_count = 0
525
526   env["INSTANCE_NIC_COUNT"] = nic_count
527
528   if disks:
529     disk_count = len(disks)
530     for idx, (size, mode) in enumerate(disks):
531       env["INSTANCE_DISK%d_SIZE" % idx] = size
532       env["INSTANCE_DISK%d_MODE" % idx] = mode
533   else:
534     disk_count = 0
535
536   env["INSTANCE_DISK_COUNT"] = disk_count
537
538   for source, kind in [(bep, "BE"), (hvp, "HV")]:
539     for key, value in source.items():
540       env["INSTANCE_%s_%s" % (kind, key)] = value
541
542   return env
543
544 def _NICListToTuple(lu, nics):
545   """Build a list of nic information tuples.
546
547   This list is suitable to be passed to _BuildInstanceHookEnv or as a return
548   value in LUQueryInstanceData.
549
550   @type lu:  L{LogicalUnit}
551   @param lu: the logical unit on whose behalf we execute
552   @type nics: list of L{objects.NIC}
553   @param nics: list of nics to convert to hooks tuples
554
555   """
556   hooks_nics = []
557   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
558   for nic in nics:
559     ip = nic.ip
560     mac = nic.mac
561     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
562     mode = filled_params[constants.NIC_MODE]
563     link = filled_params[constants.NIC_LINK]
564     hooks_nics.append((ip, mac, mode, link))
565   return hooks_nics
566
567 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
568   """Builds instance related env variables for hooks from an object.
569
570   @type lu: L{LogicalUnit}
571   @param lu: the logical unit on whose behalf we execute
572   @type instance: L{objects.Instance}
573   @param instance: the instance for which we should build the
574       environment
575   @type override: dict
576   @param override: dictionary with key/values that will override
577       our values
578   @rtype: dict
579   @return: the hook environment dictionary
580
581   """
582   cluster = lu.cfg.GetClusterInfo()
583   bep = cluster.FillBE(instance)
584   hvp = cluster.FillHV(instance)
585   args = {
586     'name': instance.name,
587     'primary_node': instance.primary_node,
588     'secondary_nodes': instance.secondary_nodes,
589     'os_type': instance.os,
590     'status': instance.admin_up,
591     'memory': bep[constants.BE_MEMORY],
592     'vcpus': bep[constants.BE_VCPUS],
593     'nics': _NICListToTuple(lu, instance.nics),
594     'disk_template': instance.disk_template,
595     'disks': [(disk.size, disk.mode) for disk in instance.disks],
596     'bep': bep,
597     'hvp': hvp,
598     'hypervisor': instance.hypervisor,
599   }
600   if override:
601     args.update(override)
602   return _BuildInstanceHookEnv(**args)
603
604
605 def _AdjustCandidatePool(lu):
606   """Adjust the candidate pool after node operations.
607
608   """
609   mod_list = lu.cfg.MaintainCandidatePool()
610   if mod_list:
611     lu.LogInfo("Promoted nodes to master candidate role: %s",
612                ", ".join(node.name for node in mod_list))
613     for name in mod_list:
614       lu.context.ReaddNode(name)
615   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
616   if mc_now > mc_max:
617     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
618                (mc_now, mc_max))
619
620
621 def _CheckNicsBridgesExist(lu, target_nics, target_node,
622                                profile=constants.PP_DEFAULT):
623   """Check that the brigdes needed by a list of nics exist.
624
625   """
626   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
627   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
628                 for nic in target_nics]
629   brlist = [params[constants.NIC_LINK] for params in paramslist
630             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
631   if brlist:
632     result = lu.rpc.call_bridges_exist(target_node, brlist)
633     result.Raise("Error checking bridges on destination node '%s'" %
634                  target_node, prereq=True)
635
636
637 def _CheckInstanceBridgesExist(lu, instance, node=None):
638   """Check that the brigdes needed by an instance exist.
639
640   """
641   if node is None:
642     node = instance.primary_node
643   _CheckNicsBridgesExist(lu, instance.nics, node)
644
645
646 class LUDestroyCluster(NoHooksLU):
647   """Logical unit for destroying the cluster.
648
649   """
650   _OP_REQP = []
651
652   def CheckPrereq(self):
653     """Check prerequisites.
654
655     This checks whether the cluster is empty.
656
657     Any errors are signalled by raising errors.OpPrereqError.
658
659     """
660     master = self.cfg.GetMasterNode()
661
662     nodelist = self.cfg.GetNodeList()
663     if len(nodelist) != 1 or nodelist[0] != master:
664       raise errors.OpPrereqError("There are still %d node(s) in"
665                                  " this cluster." % (len(nodelist) - 1))
666     instancelist = self.cfg.GetInstanceList()
667     if instancelist:
668       raise errors.OpPrereqError("There are still %d instance(s) in"
669                                  " this cluster." % len(instancelist))
670
671   def Exec(self, feedback_fn):
672     """Destroys the cluster.
673
674     """
675     master = self.cfg.GetMasterNode()
676     result = self.rpc.call_node_stop_master(master, False)
677     result.Raise("Could not disable the master role")
678     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
679     utils.CreateBackup(priv_key)
680     utils.CreateBackup(pub_key)
681     return master
682
683
684 class LUVerifyCluster(LogicalUnit):
685   """Verifies the cluster status.
686
687   """
688   HPATH = "cluster-verify"
689   HTYPE = constants.HTYPE_CLUSTER
690   _OP_REQP = ["skip_checks"]
691   REQ_BGL = False
692
693   def ExpandNames(self):
694     self.needed_locks = {
695       locking.LEVEL_NODE: locking.ALL_SET,
696       locking.LEVEL_INSTANCE: locking.ALL_SET,
697     }
698     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
699
700   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
701                   node_result, feedback_fn, master_files,
702                   drbd_map, vg_name):
703     """Run multiple tests against a node.
704
705     Test list:
706
707       - compares ganeti version
708       - checks vg existance and size > 20G
709       - checks config file checksum
710       - checks ssh to other nodes
711
712     @type nodeinfo: L{objects.Node}
713     @param nodeinfo: the node to check
714     @param file_list: required list of files
715     @param local_cksum: dictionary of local files and their checksums
716     @param node_result: the results from the node
717     @param feedback_fn: function used to accumulate results
718     @param master_files: list of files that only masters should have
719     @param drbd_map: the useddrbd minors for this node, in
720         form of minor: (instance, must_exist) which correspond to instances
721         and their running status
722     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
723
724     """
725     node = nodeinfo.name
726
727     # main result, node_result should be a non-empty dict
728     if not node_result or not isinstance(node_result, dict):
729       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
730       return True
731
732     # compares ganeti version
733     local_version = constants.PROTOCOL_VERSION
734     remote_version = node_result.get('version', None)
735     if not (remote_version and isinstance(remote_version, (list, tuple)) and
736             len(remote_version) == 2):
737       feedback_fn("  - ERROR: connection to %s failed" % (node))
738       return True
739
740     if local_version != remote_version[0]:
741       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
742                   " node %s %s" % (local_version, node, remote_version[0]))
743       return True
744
745     # node seems compatible, we can actually try to look into its results
746
747     bad = False
748
749     # full package version
750     if constants.RELEASE_VERSION != remote_version[1]:
751       feedback_fn("  - WARNING: software version mismatch: master %s,"
752                   " node %s %s" %
753                   (constants.RELEASE_VERSION, node, remote_version[1]))
754
755     # checks vg existence and size > 20G
756     if vg_name is not None:
757       vglist = node_result.get(constants.NV_VGLIST, None)
758       if not vglist:
759         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
760                         (node,))
761         bad = True
762       else:
763         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
764                                               constants.MIN_VG_SIZE)
765         if vgstatus:
766           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
767           bad = True
768
769     # checks config file checksum
770
771     remote_cksum = node_result.get(constants.NV_FILELIST, None)
772     if not isinstance(remote_cksum, dict):
773       bad = True
774       feedback_fn("  - ERROR: node hasn't returned file checksum data")
775     else:
776       for file_name in file_list:
777         node_is_mc = nodeinfo.master_candidate
778         must_have_file = file_name not in master_files
779         if file_name not in remote_cksum:
780           if node_is_mc or must_have_file:
781             bad = True
782             feedback_fn("  - ERROR: file '%s' missing" % file_name)
783         elif remote_cksum[file_name] != local_cksum[file_name]:
784           if node_is_mc or must_have_file:
785             bad = True
786             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
787           else:
788             # not candidate and this is not a must-have file
789             bad = True
790             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
791                         " '%s'" % file_name)
792         else:
793           # all good, except non-master/non-must have combination
794           if not node_is_mc and not must_have_file:
795             feedback_fn("  - ERROR: file '%s' should not exist on non master"
796                         " candidates" % file_name)
797
798     # checks ssh to any
799
800     if constants.NV_NODELIST not in node_result:
801       bad = True
802       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
803     else:
804       if node_result[constants.NV_NODELIST]:
805         bad = True
806         for node in node_result[constants.NV_NODELIST]:
807           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
808                           (node, node_result[constants.NV_NODELIST][node]))
809
810     if constants.NV_NODENETTEST not in node_result:
811       bad = True
812       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
813     else:
814       if node_result[constants.NV_NODENETTEST]:
815         bad = True
816         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
817         for node in nlist:
818           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
819                           (node, node_result[constants.NV_NODENETTEST][node]))
820
821     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
822     if isinstance(hyp_result, dict):
823       for hv_name, hv_result in hyp_result.iteritems():
824         if hv_result is not None:
825           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
826                       (hv_name, hv_result))
827
828     # check used drbd list
829     if vg_name is not None:
830       used_minors = node_result.get(constants.NV_DRBDLIST, [])
831       if not isinstance(used_minors, (tuple, list)):
832         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
833                     str(used_minors))
834       else:
835         for minor, (iname, must_exist) in drbd_map.items():
836           if minor not in used_minors and must_exist:
837             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
838                         " not active" % (minor, iname))
839             bad = True
840         for minor in used_minors:
841           if minor not in drbd_map:
842             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
843                         minor)
844             bad = True
845
846     return bad
847
848   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
849                       node_instance, feedback_fn, n_offline):
850     """Verify an instance.
851
852     This function checks to see if the required block devices are
853     available on the instance's node.
854
855     """
856     bad = False
857
858     node_current = instanceconfig.primary_node
859
860     node_vol_should = {}
861     instanceconfig.MapLVsByNode(node_vol_should)
862
863     for node in node_vol_should:
864       if node in n_offline:
865         # ignore missing volumes on offline nodes
866         continue
867       for volume in node_vol_should[node]:
868         if node not in node_vol_is or volume not in node_vol_is[node]:
869           feedback_fn("  - ERROR: volume %s missing on node %s" %
870                           (volume, node))
871           bad = True
872
873     if instanceconfig.admin_up:
874       if ((node_current not in node_instance or
875           not instance in node_instance[node_current]) and
876           node_current not in n_offline):
877         feedback_fn("  - ERROR: instance %s not running on node %s" %
878                         (instance, node_current))
879         bad = True
880
881     for node in node_instance:
882       if (not node == node_current):
883         if instance in node_instance[node]:
884           feedback_fn("  - ERROR: instance %s should not run on node %s" %
885                           (instance, node))
886           bad = True
887
888     return bad
889
890   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
891     """Verify if there are any unknown volumes in the cluster.
892
893     The .os, .swap and backup volumes are ignored. All other volumes are
894     reported as unknown.
895
896     """
897     bad = False
898
899     for node in node_vol_is:
900       for volume in node_vol_is[node]:
901         if node not in node_vol_should or volume not in node_vol_should[node]:
902           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
903                       (volume, node))
904           bad = True
905     return bad
906
907   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
908     """Verify the list of running instances.
909
910     This checks what instances are running but unknown to the cluster.
911
912     """
913     bad = False
914     for node in node_instance:
915       for runninginstance in node_instance[node]:
916         if runninginstance not in instancelist:
917           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
918                           (runninginstance, node))
919           bad = True
920     return bad
921
922   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
923     """Verify N+1 Memory Resilience.
924
925     Check that if one single node dies we can still start all the instances it
926     was primary for.
927
928     """
929     bad = False
930
931     for node, nodeinfo in node_info.iteritems():
932       # This code checks that every node which is now listed as secondary has
933       # enough memory to host all instances it is supposed to should a single
934       # other node in the cluster fail.
935       # FIXME: not ready for failover to an arbitrary node
936       # FIXME: does not support file-backed instances
937       # WARNING: we currently take into account down instances as well as up
938       # ones, considering that even if they're down someone might want to start
939       # them even in the event of a node failure.
940       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
941         needed_mem = 0
942         for instance in instances:
943           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
944           if bep[constants.BE_AUTO_BALANCE]:
945             needed_mem += bep[constants.BE_MEMORY]
946         if nodeinfo['mfree'] < needed_mem:
947           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
948                       " failovers should node %s fail" % (node, prinode))
949           bad = True
950     return bad
951
952   def CheckPrereq(self):
953     """Check prerequisites.
954
955     Transform the list of checks we're going to skip into a set and check that
956     all its members are valid.
957
958     """
959     self.skip_set = frozenset(self.op.skip_checks)
960     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
961       raise errors.OpPrereqError("Invalid checks to be skipped specified")
962
963   def BuildHooksEnv(self):
964     """Build hooks env.
965
966     Cluster-Verify hooks just rone in the post phase and their failure makes
967     the output be logged in the verify output and the verification to fail.
968
969     """
970     all_nodes = self.cfg.GetNodeList()
971     env = {
972       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
973       }
974     for node in self.cfg.GetAllNodesInfo().values():
975       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
976
977     return env, [], all_nodes
978
979   def Exec(self, feedback_fn):
980     """Verify integrity of cluster, performing various test on nodes.
981
982     """
983     bad = False
984     feedback_fn("* Verifying global settings")
985     for msg in self.cfg.VerifyConfig():
986       feedback_fn("  - ERROR: %s" % msg)
987
988     vg_name = self.cfg.GetVGName()
989     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
990     nodelist = utils.NiceSort(self.cfg.GetNodeList())
991     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
992     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
993     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
994                         for iname in instancelist)
995     i_non_redundant = [] # Non redundant instances
996     i_non_a_balanced = [] # Non auto-balanced instances
997     n_offline = [] # List of offline nodes
998     n_drained = [] # List of nodes being drained
999     node_volume = {}
1000     node_instance = {}
1001     node_info = {}
1002     instance_cfg = {}
1003
1004     # FIXME: verify OS list
1005     # do local checksums
1006     master_files = [constants.CLUSTER_CONF_FILE]
1007
1008     file_names = ssconf.SimpleStore().GetFileList()
1009     file_names.append(constants.SSL_CERT_FILE)
1010     file_names.append(constants.RAPI_CERT_FILE)
1011     file_names.extend(master_files)
1012
1013     local_checksums = utils.FingerprintFiles(file_names)
1014
1015     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1016     node_verify_param = {
1017       constants.NV_FILELIST: file_names,
1018       constants.NV_NODELIST: [node.name for node in nodeinfo
1019                               if not node.offline],
1020       constants.NV_HYPERVISOR: hypervisors,
1021       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1022                                   node.secondary_ip) for node in nodeinfo
1023                                  if not node.offline],
1024       constants.NV_INSTANCELIST: hypervisors,
1025       constants.NV_VERSION: None,
1026       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1027       }
1028     if vg_name is not None:
1029       node_verify_param[constants.NV_VGLIST] = None
1030       node_verify_param[constants.NV_LVLIST] = vg_name
1031       node_verify_param[constants.NV_DRBDLIST] = None
1032     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1033                                            self.cfg.GetClusterName())
1034
1035     cluster = self.cfg.GetClusterInfo()
1036     master_node = self.cfg.GetMasterNode()
1037     all_drbd_map = self.cfg.ComputeDRBDMap()
1038
1039     for node_i in nodeinfo:
1040       node = node_i.name
1041
1042       if node_i.offline:
1043         feedback_fn("* Skipping offline node %s" % (node,))
1044         n_offline.append(node)
1045         continue
1046
1047       if node == master_node:
1048         ntype = "master"
1049       elif node_i.master_candidate:
1050         ntype = "master candidate"
1051       elif node_i.drained:
1052         ntype = "drained"
1053         n_drained.append(node)
1054       else:
1055         ntype = "regular"
1056       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1057
1058       msg = all_nvinfo[node].fail_msg
1059       if msg:
1060         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1061         bad = True
1062         continue
1063
1064       nresult = all_nvinfo[node].payload
1065       node_drbd = {}
1066       for minor, instance in all_drbd_map[node].items():
1067         if instance not in instanceinfo:
1068           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1069                       instance)
1070           # ghost instance should not be running, but otherwise we
1071           # don't give double warnings (both ghost instance and
1072           # unallocated minor in use)
1073           node_drbd[minor] = (instance, False)
1074         else:
1075           instance = instanceinfo[instance]
1076           node_drbd[minor] = (instance.name, instance.admin_up)
1077       result = self._VerifyNode(node_i, file_names, local_checksums,
1078                                 nresult, feedback_fn, master_files,
1079                                 node_drbd, vg_name)
1080       bad = bad or result
1081
1082       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1083       if vg_name is None:
1084         node_volume[node] = {}
1085       elif isinstance(lvdata, basestring):
1086         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1087                     (node, utils.SafeEncode(lvdata)))
1088         bad = True
1089         node_volume[node] = {}
1090       elif not isinstance(lvdata, dict):
1091         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1092         bad = True
1093         continue
1094       else:
1095         node_volume[node] = lvdata
1096
1097       # node_instance
1098       idata = nresult.get(constants.NV_INSTANCELIST, None)
1099       if not isinstance(idata, list):
1100         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1101                     (node,))
1102         bad = True
1103         continue
1104
1105       node_instance[node] = idata
1106
1107       # node_info
1108       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1109       if not isinstance(nodeinfo, dict):
1110         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1111         bad = True
1112         continue
1113
1114       try:
1115         node_info[node] = {
1116           "mfree": int(nodeinfo['memory_free']),
1117           "pinst": [],
1118           "sinst": [],
1119           # dictionary holding all instances this node is secondary for,
1120           # grouped by their primary node. Each key is a cluster node, and each
1121           # value is a list of instances which have the key as primary and the
1122           # current node as secondary.  this is handy to calculate N+1 memory
1123           # availability if you can only failover from a primary to its
1124           # secondary.
1125           "sinst-by-pnode": {},
1126         }
1127         # FIXME: devise a free space model for file based instances as well
1128         if vg_name is not None:
1129           if (constants.NV_VGLIST not in nresult or
1130               vg_name not in nresult[constants.NV_VGLIST]):
1131             feedback_fn("  - ERROR: node %s didn't return data for the"
1132                         " volume group '%s' - it is either missing or broken" %
1133                         (node, vg_name))
1134             bad = True
1135             continue
1136           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1137       except (ValueError, KeyError):
1138         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1139                     " from node %s" % (node,))
1140         bad = True
1141         continue
1142
1143     node_vol_should = {}
1144
1145     for instance in instancelist:
1146       feedback_fn("* Verifying instance %s" % instance)
1147       inst_config = instanceinfo[instance]
1148       result =  self._VerifyInstance(instance, inst_config, node_volume,
1149                                      node_instance, feedback_fn, n_offline)
1150       bad = bad or result
1151       inst_nodes_offline = []
1152
1153       inst_config.MapLVsByNode(node_vol_should)
1154
1155       instance_cfg[instance] = inst_config
1156
1157       pnode = inst_config.primary_node
1158       if pnode in node_info:
1159         node_info[pnode]['pinst'].append(instance)
1160       elif pnode not in n_offline:
1161         feedback_fn("  - ERROR: instance %s, connection to primary node"
1162                     " %s failed" % (instance, pnode))
1163         bad = True
1164
1165       if pnode in n_offline:
1166         inst_nodes_offline.append(pnode)
1167
1168       # If the instance is non-redundant we cannot survive losing its primary
1169       # node, so we are not N+1 compliant. On the other hand we have no disk
1170       # templates with more than one secondary so that situation is not well
1171       # supported either.
1172       # FIXME: does not support file-backed instances
1173       if len(inst_config.secondary_nodes) == 0:
1174         i_non_redundant.append(instance)
1175       elif len(inst_config.secondary_nodes) > 1:
1176         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1177                     % instance)
1178
1179       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1180         i_non_a_balanced.append(instance)
1181
1182       for snode in inst_config.secondary_nodes:
1183         if snode in node_info:
1184           node_info[snode]['sinst'].append(instance)
1185           if pnode not in node_info[snode]['sinst-by-pnode']:
1186             node_info[snode]['sinst-by-pnode'][pnode] = []
1187           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1188         elif snode not in n_offline:
1189           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1190                       " %s failed" % (instance, snode))
1191           bad = True
1192         if snode in n_offline:
1193           inst_nodes_offline.append(snode)
1194
1195       if inst_nodes_offline:
1196         # warn that the instance lives on offline nodes, and set bad=True
1197         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1198                     ", ".join(inst_nodes_offline))
1199         bad = True
1200
1201     feedback_fn("* Verifying orphan volumes")
1202     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1203                                        feedback_fn)
1204     bad = bad or result
1205
1206     feedback_fn("* Verifying remaining instances")
1207     result = self._VerifyOrphanInstances(instancelist, node_instance,
1208                                          feedback_fn)
1209     bad = bad or result
1210
1211     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1212       feedback_fn("* Verifying N+1 Memory redundancy")
1213       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1214       bad = bad or result
1215
1216     feedback_fn("* Other Notes")
1217     if i_non_redundant:
1218       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1219                   % len(i_non_redundant))
1220
1221     if i_non_a_balanced:
1222       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1223                   % len(i_non_a_balanced))
1224
1225     if n_offline:
1226       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1227
1228     if n_drained:
1229       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1230
1231     return not bad
1232
1233   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1234     """Analize the post-hooks' result
1235
1236     This method analyses the hook result, handles it, and sends some
1237     nicely-formatted feedback back to the user.
1238
1239     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1240         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1241     @param hooks_results: the results of the multi-node hooks rpc call
1242     @param feedback_fn: function used send feedback back to the caller
1243     @param lu_result: previous Exec result
1244     @return: the new Exec result, based on the previous result
1245         and hook results
1246
1247     """
1248     # We only really run POST phase hooks, and are only interested in
1249     # their results
1250     if phase == constants.HOOKS_PHASE_POST:
1251       # Used to change hooks' output to proper indentation
1252       indent_re = re.compile('^', re.M)
1253       feedback_fn("* Hooks Results")
1254       if not hooks_results:
1255         feedback_fn("  - ERROR: general communication failure")
1256         lu_result = 1
1257       else:
1258         for node_name in hooks_results:
1259           show_node_header = True
1260           res = hooks_results[node_name]
1261           msg = res.fail_msg
1262           if msg:
1263             if res.offline:
1264               # no need to warn or set fail return value
1265               continue
1266             feedback_fn("    Communication failure in hooks execution: %s" %
1267                         msg)
1268             lu_result = 1
1269             continue
1270           for script, hkr, output in res.payload:
1271             if hkr == constants.HKR_FAIL:
1272               # The node header is only shown once, if there are
1273               # failing hooks on that node
1274               if show_node_header:
1275                 feedback_fn("  Node %s:" % node_name)
1276                 show_node_header = False
1277               feedback_fn("    ERROR: Script %s failed, output:" % script)
1278               output = indent_re.sub('      ', output)
1279               feedback_fn("%s" % output)
1280               lu_result = 1
1281
1282       return lu_result
1283
1284
1285 class LUVerifyDisks(NoHooksLU):
1286   """Verifies the cluster disks status.
1287
1288   """
1289   _OP_REQP = []
1290   REQ_BGL = False
1291
1292   def ExpandNames(self):
1293     self.needed_locks = {
1294       locking.LEVEL_NODE: locking.ALL_SET,
1295       locking.LEVEL_INSTANCE: locking.ALL_SET,
1296     }
1297     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1298
1299   def CheckPrereq(self):
1300     """Check prerequisites.
1301
1302     This has no prerequisites.
1303
1304     """
1305     pass
1306
1307   def Exec(self, feedback_fn):
1308     """Verify integrity of cluster disks.
1309
1310     @rtype: tuple of three items
1311     @return: a tuple of (dict of node-to-node_error, list of instances
1312         which need activate-disks, dict of instance: (node, volume) for
1313         missing volumes
1314
1315     """
1316     result = res_nodes, res_instances, res_missing = {}, [], {}
1317
1318     vg_name = self.cfg.GetVGName()
1319     nodes = utils.NiceSort(self.cfg.GetNodeList())
1320     instances = [self.cfg.GetInstanceInfo(name)
1321                  for name in self.cfg.GetInstanceList()]
1322
1323     nv_dict = {}
1324     for inst in instances:
1325       inst_lvs = {}
1326       if (not inst.admin_up or
1327           inst.disk_template not in constants.DTS_NET_MIRROR):
1328         continue
1329       inst.MapLVsByNode(inst_lvs)
1330       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1331       for node, vol_list in inst_lvs.iteritems():
1332         for vol in vol_list:
1333           nv_dict[(node, vol)] = inst
1334
1335     if not nv_dict:
1336       return result
1337
1338     node_lvs = self.rpc.call_lv_list(nodes, vg_name)
1339
1340     to_act = set()
1341     for node in nodes:
1342       # node_volume
1343       node_res = node_lvs[node]
1344       if node_res.offline:
1345         continue
1346       msg = node_res.fail_msg
1347       if msg:
1348         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1349         res_nodes[node] = msg
1350         continue
1351
1352       lvs = node_res.payload
1353       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1354         inst = nv_dict.pop((node, lv_name), None)
1355         if (not lv_online and inst is not None
1356             and inst.name not in res_instances):
1357           res_instances.append(inst.name)
1358
1359     # any leftover items in nv_dict are missing LVs, let's arrange the
1360     # data better
1361     for key, inst in nv_dict.iteritems():
1362       if inst.name not in res_missing:
1363         res_missing[inst.name] = []
1364       res_missing[inst.name].append(key)
1365
1366     return result
1367
1368
1369 class LURenameCluster(LogicalUnit):
1370   """Rename the cluster.
1371
1372   """
1373   HPATH = "cluster-rename"
1374   HTYPE = constants.HTYPE_CLUSTER
1375   _OP_REQP = ["name"]
1376
1377   def BuildHooksEnv(self):
1378     """Build hooks env.
1379
1380     """
1381     env = {
1382       "OP_TARGET": self.cfg.GetClusterName(),
1383       "NEW_NAME": self.op.name,
1384       }
1385     mn = self.cfg.GetMasterNode()
1386     return env, [mn], [mn]
1387
1388   def CheckPrereq(self):
1389     """Verify that the passed name is a valid one.
1390
1391     """
1392     hostname = utils.HostInfo(self.op.name)
1393
1394     new_name = hostname.name
1395     self.ip = new_ip = hostname.ip
1396     old_name = self.cfg.GetClusterName()
1397     old_ip = self.cfg.GetMasterIP()
1398     if new_name == old_name and new_ip == old_ip:
1399       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1400                                  " cluster has changed")
1401     if new_ip != old_ip:
1402       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1403         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1404                                    " reachable on the network. Aborting." %
1405                                    new_ip)
1406
1407     self.op.name = new_name
1408
1409   def Exec(self, feedback_fn):
1410     """Rename the cluster.
1411
1412     """
1413     clustername = self.op.name
1414     ip = self.ip
1415
1416     # shutdown the master IP
1417     master = self.cfg.GetMasterNode()
1418     result = self.rpc.call_node_stop_master(master, False)
1419     result.Raise("Could not disable the master role")
1420
1421     try:
1422       cluster = self.cfg.GetClusterInfo()
1423       cluster.cluster_name = clustername
1424       cluster.master_ip = ip
1425       self.cfg.Update(cluster)
1426
1427       # update the known hosts file
1428       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1429       node_list = self.cfg.GetNodeList()
1430       try:
1431         node_list.remove(master)
1432       except ValueError:
1433         pass
1434       result = self.rpc.call_upload_file(node_list,
1435                                          constants.SSH_KNOWN_HOSTS_FILE)
1436       for to_node, to_result in result.iteritems():
1437         msg = to_result.fail_msg
1438         if msg:
1439           msg = ("Copy of file %s to node %s failed: %s" %
1440                  (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1441           self.proc.LogWarning(msg)
1442
1443     finally:
1444       result = self.rpc.call_node_start_master(master, False)
1445       msg = result.fail_msg
1446       if msg:
1447         self.LogWarning("Could not re-enable the master role on"
1448                         " the master, please restart manually: %s", msg)
1449
1450
1451 def _RecursiveCheckIfLVMBased(disk):
1452   """Check if the given disk or its children are lvm-based.
1453
1454   @type disk: L{objects.Disk}
1455   @param disk: the disk to check
1456   @rtype: booleean
1457   @return: boolean indicating whether a LD_LV dev_type was found or not
1458
1459   """
1460   if disk.children:
1461     for chdisk in disk.children:
1462       if _RecursiveCheckIfLVMBased(chdisk):
1463         return True
1464   return disk.dev_type == constants.LD_LV
1465
1466
1467 class LUSetClusterParams(LogicalUnit):
1468   """Change the parameters of the cluster.
1469
1470   """
1471   HPATH = "cluster-modify"
1472   HTYPE = constants.HTYPE_CLUSTER
1473   _OP_REQP = []
1474   REQ_BGL = False
1475
1476   def CheckArguments(self):
1477     """Check parameters
1478
1479     """
1480     if not hasattr(self.op, "candidate_pool_size"):
1481       self.op.candidate_pool_size = None
1482     if self.op.candidate_pool_size is not None:
1483       try:
1484         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1485       except (ValueError, TypeError), err:
1486         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1487                                    str(err))
1488       if self.op.candidate_pool_size < 1:
1489         raise errors.OpPrereqError("At least one master candidate needed")
1490
1491   def ExpandNames(self):
1492     # FIXME: in the future maybe other cluster params won't require checking on
1493     # all nodes to be modified.
1494     self.needed_locks = {
1495       locking.LEVEL_NODE: locking.ALL_SET,
1496     }
1497     self.share_locks[locking.LEVEL_NODE] = 1
1498
1499   def BuildHooksEnv(self):
1500     """Build hooks env.
1501
1502     """
1503     env = {
1504       "OP_TARGET": self.cfg.GetClusterName(),
1505       "NEW_VG_NAME": self.op.vg_name,
1506       }
1507     mn = self.cfg.GetMasterNode()
1508     return env, [mn], [mn]
1509
1510   def CheckPrereq(self):
1511     """Check prerequisites.
1512
1513     This checks whether the given params don't conflict and
1514     if the given volume group is valid.
1515
1516     """
1517     if self.op.vg_name is not None and not self.op.vg_name:
1518       instances = self.cfg.GetAllInstancesInfo().values()
1519       for inst in instances:
1520         for disk in inst.disks:
1521           if _RecursiveCheckIfLVMBased(disk):
1522             raise errors.OpPrereqError("Cannot disable lvm storage while"
1523                                        " lvm-based instances exist")
1524
1525     node_list = self.acquired_locks[locking.LEVEL_NODE]
1526
1527     # if vg_name not None, checks given volume group on all nodes
1528     if self.op.vg_name:
1529       vglist = self.rpc.call_vg_list(node_list)
1530       for node in node_list:
1531         msg = vglist[node].fail_msg
1532         if msg:
1533           # ignoring down node
1534           self.LogWarning("Error while gathering data on node %s"
1535                           " (ignoring node): %s", node, msg)
1536           continue
1537         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1538                                               self.op.vg_name,
1539                                               constants.MIN_VG_SIZE)
1540         if vgstatus:
1541           raise errors.OpPrereqError("Error on node '%s': %s" %
1542                                      (node, vgstatus))
1543
1544     self.cluster = cluster = self.cfg.GetClusterInfo()
1545     # validate params changes
1546     if self.op.beparams:
1547       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1548       self.new_beparams = objects.FillDict(
1549         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1550
1551     if self.op.nicparams:
1552       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1553       self.new_nicparams = objects.FillDict(
1554         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1555       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1556
1557     # hypervisor list/parameters
1558     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1559     if self.op.hvparams:
1560       if not isinstance(self.op.hvparams, dict):
1561         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1562       for hv_name, hv_dict in self.op.hvparams.items():
1563         if hv_name not in self.new_hvparams:
1564           self.new_hvparams[hv_name] = hv_dict
1565         else:
1566           self.new_hvparams[hv_name].update(hv_dict)
1567
1568     if self.op.enabled_hypervisors is not None:
1569       self.hv_list = self.op.enabled_hypervisors
1570     else:
1571       self.hv_list = cluster.enabled_hypervisors
1572
1573     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1574       # either the enabled list has changed, or the parameters have, validate
1575       for hv_name, hv_params in self.new_hvparams.items():
1576         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1577             (self.op.enabled_hypervisors and
1578              hv_name in self.op.enabled_hypervisors)):
1579           # either this is a new hypervisor, or its parameters have changed
1580           hv_class = hypervisor.GetHypervisor(hv_name)
1581           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1582           hv_class.CheckParameterSyntax(hv_params)
1583           _CheckHVParams(self, node_list, hv_name, hv_params)
1584
1585   def Exec(self, feedback_fn):
1586     """Change the parameters of the cluster.
1587
1588     """
1589     if self.op.vg_name is not None:
1590       new_volume = self.op.vg_name
1591       if not new_volume:
1592         new_volume = None
1593       if new_volume != self.cfg.GetVGName():
1594         self.cfg.SetVGName(new_volume)
1595       else:
1596         feedback_fn("Cluster LVM configuration already in desired"
1597                     " state, not changing")
1598     if self.op.hvparams:
1599       self.cluster.hvparams = self.new_hvparams
1600     if self.op.enabled_hypervisors is not None:
1601       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1602     if self.op.beparams:
1603       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1604     if self.op.nicparams:
1605       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1606
1607     if self.op.candidate_pool_size is not None:
1608       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1609
1610     self.cfg.Update(self.cluster)
1611
1612     # we want to update nodes after the cluster so that if any errors
1613     # happen, we have recorded and saved the cluster info
1614     if self.op.candidate_pool_size is not None:
1615       _AdjustCandidatePool(self)
1616
1617
1618 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1619   """Distribute additional files which are part of the cluster configuration.
1620
1621   ConfigWriter takes care of distributing the config and ssconf files, but
1622   there are more files which should be distributed to all nodes. This function
1623   makes sure those are copied.
1624
1625   @param lu: calling logical unit
1626   @param additional_nodes: list of nodes not in the config to distribute to
1627
1628   """
1629   # 1. Gather target nodes
1630   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1631   dist_nodes = lu.cfg.GetNodeList()
1632   if additional_nodes is not None:
1633     dist_nodes.extend(additional_nodes)
1634   if myself.name in dist_nodes:
1635     dist_nodes.remove(myself.name)
1636   # 2. Gather files to distribute
1637   dist_files = set([constants.ETC_HOSTS,
1638                     constants.SSH_KNOWN_HOSTS_FILE,
1639                     constants.RAPI_CERT_FILE,
1640                     constants.RAPI_USERS_FILE,
1641                    ])
1642
1643   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1644   for hv_name in enabled_hypervisors:
1645     hv_class = hypervisor.GetHypervisor(hv_name)
1646     dist_files.update(hv_class.GetAncillaryFiles())
1647
1648   # 3. Perform the files upload
1649   for fname in dist_files:
1650     if os.path.exists(fname):
1651       result = lu.rpc.call_upload_file(dist_nodes, fname)
1652       for to_node, to_result in result.items():
1653         msg = to_result.fail_msg
1654         if msg:
1655           msg = ("Copy of file %s to node %s failed: %s" %
1656                  (fname, to_node, msg))
1657           lu.proc.LogWarning(msg)
1658
1659
1660 class LURedistributeConfig(NoHooksLU):
1661   """Force the redistribution of cluster configuration.
1662
1663   This is a very simple LU.
1664
1665   """
1666   _OP_REQP = []
1667   REQ_BGL = False
1668
1669   def ExpandNames(self):
1670     self.needed_locks = {
1671       locking.LEVEL_NODE: locking.ALL_SET,
1672     }
1673     self.share_locks[locking.LEVEL_NODE] = 1
1674
1675   def CheckPrereq(self):
1676     """Check prerequisites.
1677
1678     """
1679
1680   def Exec(self, feedback_fn):
1681     """Redistribute the configuration.
1682
1683     """
1684     self.cfg.Update(self.cfg.GetClusterInfo())
1685     _RedistributeAncillaryFiles(self)
1686
1687
1688 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1689   """Sleep and poll for an instance's disk to sync.
1690
1691   """
1692   if not instance.disks:
1693     return True
1694
1695   if not oneshot:
1696     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1697
1698   node = instance.primary_node
1699
1700   for dev in instance.disks:
1701     lu.cfg.SetDiskID(dev, node)
1702
1703   retries = 0
1704   degr_retries = 10 # in seconds, as we sleep 1 second each time
1705   while True:
1706     max_time = 0
1707     done = True
1708     cumul_degraded = False
1709     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1710     msg = rstats.fail_msg
1711     if msg:
1712       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1713       retries += 1
1714       if retries >= 10:
1715         raise errors.RemoteError("Can't contact node %s for mirror data,"
1716                                  " aborting." % node)
1717       time.sleep(6)
1718       continue
1719     rstats = rstats.payload
1720     retries = 0
1721     for i, mstat in enumerate(rstats):
1722       if mstat is None:
1723         lu.LogWarning("Can't compute data for node %s/%s",
1724                            node, instance.disks[i].iv_name)
1725         continue
1726       # we ignore the ldisk parameter
1727       perc_done, est_time, is_degraded, _ = mstat
1728       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1729       if perc_done is not None:
1730         done = False
1731         if est_time is not None:
1732           rem_time = "%d estimated seconds remaining" % est_time
1733           max_time = est_time
1734         else:
1735           rem_time = "no time estimate"
1736         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1737                         (instance.disks[i].iv_name, perc_done, rem_time))
1738
1739     # if we're done but degraded, let's do a few small retries, to
1740     # make sure we see a stable and not transient situation; therefore
1741     # we force restart of the loop
1742     if (done or oneshot) and cumul_degraded and degr_retries > 0:
1743       logging.info("Degraded disks found, %d retries left", degr_retries)
1744       degr_retries -= 1
1745       time.sleep(1)
1746       continue
1747
1748     if done or oneshot:
1749       break
1750
1751     time.sleep(min(60, max_time))
1752
1753   if done:
1754     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1755   return not cumul_degraded
1756
1757
1758 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1759   """Check that mirrors are not degraded.
1760
1761   The ldisk parameter, if True, will change the test from the
1762   is_degraded attribute (which represents overall non-ok status for
1763   the device(s)) to the ldisk (representing the local storage status).
1764
1765   """
1766   lu.cfg.SetDiskID(dev, node)
1767   if ldisk:
1768     idx = 6
1769   else:
1770     idx = 5
1771
1772   result = True
1773   if on_primary or dev.AssembleOnSecondary():
1774     rstats = lu.rpc.call_blockdev_find(node, dev)
1775     msg = rstats.fail_msg
1776     if msg:
1777       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1778       result = False
1779     elif not rstats.payload:
1780       lu.LogWarning("Can't find disk on node %s", node)
1781       result = False
1782     else:
1783       result = result and (not rstats.payload[idx])
1784   if dev.children:
1785     for child in dev.children:
1786       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1787
1788   return result
1789
1790
1791 class LUDiagnoseOS(NoHooksLU):
1792   """Logical unit for OS diagnose/query.
1793
1794   """
1795   _OP_REQP = ["output_fields", "names"]
1796   REQ_BGL = False
1797   _FIELDS_STATIC = utils.FieldSet()
1798   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1799
1800   def ExpandNames(self):
1801     if self.op.names:
1802       raise errors.OpPrereqError("Selective OS query not supported")
1803
1804     _CheckOutputFields(static=self._FIELDS_STATIC,
1805                        dynamic=self._FIELDS_DYNAMIC,
1806                        selected=self.op.output_fields)
1807
1808     # Lock all nodes, in shared mode
1809     # Temporary removal of locks, should be reverted later
1810     # TODO: reintroduce locks when they are lighter-weight
1811     self.needed_locks = {}
1812     #self.share_locks[locking.LEVEL_NODE] = 1
1813     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1814
1815   def CheckPrereq(self):
1816     """Check prerequisites.
1817
1818     """
1819
1820   @staticmethod
1821   def _DiagnoseByOS(node_list, rlist):
1822     """Remaps a per-node return list into an a per-os per-node dictionary
1823
1824     @param node_list: a list with the names of all nodes
1825     @param rlist: a map with node names as keys and OS objects as values
1826
1827     @rtype: dict
1828     @return: a dictionary with osnames as keys and as value another map, with
1829         nodes as keys and tuples of (path, status, diagnose) as values, eg::
1830
1831           {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1832                                      (/srv/..., False, "invalid api")],
1833                            "node2": [(/srv/..., True, "")]}
1834           }
1835
1836     """
1837     all_os = {}
1838     # we build here the list of nodes that didn't fail the RPC (at RPC
1839     # level), so that nodes with a non-responding node daemon don't
1840     # make all OSes invalid
1841     good_nodes = [node_name for node_name in rlist
1842                   if not rlist[node_name].fail_msg]
1843     for node_name, nr in rlist.items():
1844       if nr.fail_msg or not nr.payload:
1845         continue
1846       for name, path, status, diagnose in nr.payload:
1847         if name not in all_os:
1848           # build a list of nodes for this os containing empty lists
1849           # for each node in node_list
1850           all_os[name] = {}
1851           for nname in good_nodes:
1852             all_os[name][nname] = []
1853         all_os[name][node_name].append((path, status, diagnose))
1854     return all_os
1855
1856   def Exec(self, feedback_fn):
1857     """Compute the list of OSes.
1858
1859     """
1860     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1861     node_data = self.rpc.call_os_diagnose(valid_nodes)
1862     pol = self._DiagnoseByOS(valid_nodes, node_data)
1863     output = []
1864     for os_name, os_data in pol.items():
1865       row = []
1866       for field in self.op.output_fields:
1867         if field == "name":
1868           val = os_name
1869         elif field == "valid":
1870           val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1871         elif field == "node_status":
1872           # this is just a copy of the dict
1873           val = {}
1874           for node_name, nos_list in os_data.items():
1875             val[node_name] = nos_list
1876         else:
1877           raise errors.ParameterError(field)
1878         row.append(val)
1879       output.append(row)
1880
1881     return output
1882
1883
1884 class LURemoveNode(LogicalUnit):
1885   """Logical unit for removing a node.
1886
1887   """
1888   HPATH = "node-remove"
1889   HTYPE = constants.HTYPE_NODE
1890   _OP_REQP = ["node_name"]
1891
1892   def BuildHooksEnv(self):
1893     """Build hooks env.
1894
1895     This doesn't run on the target node in the pre phase as a failed
1896     node would then be impossible to remove.
1897
1898     """
1899     env = {
1900       "OP_TARGET": self.op.node_name,
1901       "NODE_NAME": self.op.node_name,
1902       }
1903     all_nodes = self.cfg.GetNodeList()
1904     all_nodes.remove(self.op.node_name)
1905     return env, all_nodes, all_nodes
1906
1907   def CheckPrereq(self):
1908     """Check prerequisites.
1909
1910     This checks:
1911      - the node exists in the configuration
1912      - it does not have primary or secondary instances
1913      - it's not the master
1914
1915     Any errors are signalled by raising errors.OpPrereqError.
1916
1917     """
1918     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1919     if node is None:
1920       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1921
1922     instance_list = self.cfg.GetInstanceList()
1923
1924     masternode = self.cfg.GetMasterNode()
1925     if node.name == masternode:
1926       raise errors.OpPrereqError("Node is the master node,"
1927                                  " you need to failover first.")
1928
1929     for instance_name in instance_list:
1930       instance = self.cfg.GetInstanceInfo(instance_name)
1931       if node.name in instance.all_nodes:
1932         raise errors.OpPrereqError("Instance %s is still running on the node,"
1933                                    " please remove first." % instance_name)
1934     self.op.node_name = node.name
1935     self.node = node
1936
1937   def Exec(self, feedback_fn):
1938     """Removes the node from the cluster.
1939
1940     """
1941     node = self.node
1942     logging.info("Stopping the node daemon and removing configs from node %s",
1943                  node.name)
1944
1945     self.context.RemoveNode(node.name)
1946
1947     result = self.rpc.call_node_leave_cluster(node.name)
1948     msg = result.fail_msg
1949     if msg:
1950       self.LogWarning("Errors encountered on the remote node while leaving"
1951                       " the cluster: %s", msg)
1952
1953     # Promote nodes to master candidate as needed
1954     _AdjustCandidatePool(self)
1955
1956
1957 class LUQueryNodes(NoHooksLU):
1958   """Logical unit for querying nodes.
1959
1960   """
1961   _OP_REQP = ["output_fields", "names", "use_locking"]
1962   REQ_BGL = False
1963   _FIELDS_DYNAMIC = utils.FieldSet(
1964     "dtotal", "dfree",
1965     "mtotal", "mnode", "mfree",
1966     "bootid",
1967     "ctotal", "cnodes", "csockets",
1968     )
1969
1970   _FIELDS_STATIC = utils.FieldSet(
1971     "name", "pinst_cnt", "sinst_cnt",
1972     "pinst_list", "sinst_list",
1973     "pip", "sip", "tags",
1974     "serial_no",
1975     "master_candidate",
1976     "master",
1977     "offline",
1978     "drained",
1979     )
1980
1981   def ExpandNames(self):
1982     _CheckOutputFields(static=self._FIELDS_STATIC,
1983                        dynamic=self._FIELDS_DYNAMIC,
1984                        selected=self.op.output_fields)
1985
1986     self.needed_locks = {}
1987     self.share_locks[locking.LEVEL_NODE] = 1
1988
1989     if self.op.names:
1990       self.wanted = _GetWantedNodes(self, self.op.names)
1991     else:
1992       self.wanted = locking.ALL_SET
1993
1994     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1995     self.do_locking = self.do_node_query and self.op.use_locking
1996     if self.do_locking:
1997       # if we don't request only static fields, we need to lock the nodes
1998       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1999
2000
2001   def CheckPrereq(self):
2002     """Check prerequisites.
2003
2004     """
2005     # The validation of the node list is done in the _GetWantedNodes,
2006     # if non empty, and if empty, there's no validation to do
2007     pass
2008
2009   def Exec(self, feedback_fn):
2010     """Computes the list of nodes and their attributes.
2011
2012     """
2013     all_info = self.cfg.GetAllNodesInfo()
2014     if self.do_locking:
2015       nodenames = self.acquired_locks[locking.LEVEL_NODE]
2016     elif self.wanted != locking.ALL_SET:
2017       nodenames = self.wanted
2018       missing = set(nodenames).difference(all_info.keys())
2019       if missing:
2020         raise errors.OpExecError(
2021           "Some nodes were removed before retrieving their data: %s" % missing)
2022     else:
2023       nodenames = all_info.keys()
2024
2025     nodenames = utils.NiceSort(nodenames)
2026     nodelist = [all_info[name] for name in nodenames]
2027
2028     # begin data gathering
2029
2030     if self.do_node_query:
2031       live_data = {}
2032       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2033                                           self.cfg.GetHypervisorType())
2034       for name in nodenames:
2035         nodeinfo = node_data[name]
2036         if not nodeinfo.fail_msg and nodeinfo.payload:
2037           nodeinfo = nodeinfo.payload
2038           fn = utils.TryConvert
2039           live_data[name] = {
2040             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2041             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2042             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2043             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2044             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2045             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2046             "bootid": nodeinfo.get('bootid', None),
2047             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2048             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2049             }
2050         else:
2051           live_data[name] = {}
2052     else:
2053       live_data = dict.fromkeys(nodenames, {})
2054
2055     node_to_primary = dict([(name, set()) for name in nodenames])
2056     node_to_secondary = dict([(name, set()) for name in nodenames])
2057
2058     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2059                              "sinst_cnt", "sinst_list"))
2060     if inst_fields & frozenset(self.op.output_fields):
2061       instancelist = self.cfg.GetInstanceList()
2062
2063       for instance_name in instancelist:
2064         inst = self.cfg.GetInstanceInfo(instance_name)
2065         if inst.primary_node in node_to_primary:
2066           node_to_primary[inst.primary_node].add(inst.name)
2067         for secnode in inst.secondary_nodes:
2068           if secnode in node_to_secondary:
2069             node_to_secondary[secnode].add(inst.name)
2070
2071     master_node = self.cfg.GetMasterNode()
2072
2073     # end data gathering
2074
2075     output = []
2076     for node in nodelist:
2077       node_output = []
2078       for field in self.op.output_fields:
2079         if field == "name":
2080           val = node.name
2081         elif field == "pinst_list":
2082           val = list(node_to_primary[node.name])
2083         elif field == "sinst_list":
2084           val = list(node_to_secondary[node.name])
2085         elif field == "pinst_cnt":
2086           val = len(node_to_primary[node.name])
2087         elif field == "sinst_cnt":
2088           val = len(node_to_secondary[node.name])
2089         elif field == "pip":
2090           val = node.primary_ip
2091         elif field == "sip":
2092           val = node.secondary_ip
2093         elif field == "tags":
2094           val = list(node.GetTags())
2095         elif field == "serial_no":
2096           val = node.serial_no
2097         elif field == "master_candidate":
2098           val = node.master_candidate
2099         elif field == "master":
2100           val = node.name == master_node
2101         elif field == "offline":
2102           val = node.offline
2103         elif field == "drained":
2104           val = node.drained
2105         elif self._FIELDS_DYNAMIC.Matches(field):
2106           val = live_data[node.name].get(field, None)
2107         else:
2108           raise errors.ParameterError(field)
2109         node_output.append(val)
2110       output.append(node_output)
2111
2112     return output
2113
2114
2115 class LUQueryNodeVolumes(NoHooksLU):
2116   """Logical unit for getting volumes on node(s).
2117
2118   """
2119   _OP_REQP = ["nodes", "output_fields"]
2120   REQ_BGL = False
2121   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2122   _FIELDS_STATIC = utils.FieldSet("node")
2123
2124   def ExpandNames(self):
2125     _CheckOutputFields(static=self._FIELDS_STATIC,
2126                        dynamic=self._FIELDS_DYNAMIC,
2127                        selected=self.op.output_fields)
2128
2129     self.needed_locks = {}
2130     self.share_locks[locking.LEVEL_NODE] = 1
2131     if not self.op.nodes:
2132       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2133     else:
2134       self.needed_locks[locking.LEVEL_NODE] = \
2135         _GetWantedNodes(self, self.op.nodes)
2136
2137   def CheckPrereq(self):
2138     """Check prerequisites.
2139
2140     This checks that the fields required are valid output fields.
2141
2142     """
2143     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2144
2145   def Exec(self, feedback_fn):
2146     """Computes the list of nodes and their attributes.
2147
2148     """
2149     nodenames = self.nodes
2150     volumes = self.rpc.call_node_volumes(nodenames)
2151
2152     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2153              in self.cfg.GetInstanceList()]
2154
2155     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2156
2157     output = []
2158     for node in nodenames:
2159       nresult = volumes[node]
2160       if nresult.offline:
2161         continue
2162       msg = nresult.fail_msg
2163       if msg:
2164         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2165         continue
2166
2167       node_vols = nresult.payload[:]
2168       node_vols.sort(key=lambda vol: vol['dev'])
2169
2170       for vol in node_vols:
2171         node_output = []
2172         for field in self.op.output_fields:
2173           if field == "node":
2174             val = node
2175           elif field == "phys":
2176             val = vol['dev']
2177           elif field == "vg":
2178             val = vol['vg']
2179           elif field == "name":
2180             val = vol['name']
2181           elif field == "size":
2182             val = int(float(vol['size']))
2183           elif field == "instance":
2184             for inst in ilist:
2185               if node not in lv_by_node[inst]:
2186                 continue
2187               if vol['name'] in lv_by_node[inst][node]:
2188                 val = inst.name
2189                 break
2190             else:
2191               val = '-'
2192           else:
2193             raise errors.ParameterError(field)
2194           node_output.append(str(val))
2195
2196         output.append(node_output)
2197
2198     return output
2199
2200
2201 class LUAddNode(LogicalUnit):
2202   """Logical unit for adding node to the cluster.
2203
2204   """
2205   HPATH = "node-add"
2206   HTYPE = constants.HTYPE_NODE
2207   _OP_REQP = ["node_name"]
2208
2209   def BuildHooksEnv(self):
2210     """Build hooks env.
2211
2212     This will run on all nodes before, and on all nodes + the new node after.
2213
2214     """
2215     env = {
2216       "OP_TARGET": self.op.node_name,
2217       "NODE_NAME": self.op.node_name,
2218       "NODE_PIP": self.op.primary_ip,
2219       "NODE_SIP": self.op.secondary_ip,
2220       }
2221     nodes_0 = self.cfg.GetNodeList()
2222     nodes_1 = nodes_0 + [self.op.node_name, ]
2223     return env, nodes_0, nodes_1
2224
2225   def CheckPrereq(self):
2226     """Check prerequisites.
2227
2228     This checks:
2229      - the new node is not already in the config
2230      - it is resolvable
2231      - its parameters (single/dual homed) matches the cluster
2232
2233     Any errors are signalled by raising errors.OpPrereqError.
2234
2235     """
2236     node_name = self.op.node_name
2237     cfg = self.cfg
2238
2239     dns_data = utils.HostInfo(node_name)
2240
2241     node = dns_data.name
2242     primary_ip = self.op.primary_ip = dns_data.ip
2243     secondary_ip = getattr(self.op, "secondary_ip", None)
2244     if secondary_ip is None:
2245       secondary_ip = primary_ip
2246     if not utils.IsValidIP(secondary_ip):
2247       raise errors.OpPrereqError("Invalid secondary IP given")
2248     self.op.secondary_ip = secondary_ip
2249
2250     node_list = cfg.GetNodeList()
2251     if not self.op.readd and node in node_list:
2252       raise errors.OpPrereqError("Node %s is already in the configuration" %
2253                                  node)
2254     elif self.op.readd and node not in node_list:
2255       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2256
2257     for existing_node_name in node_list:
2258       existing_node = cfg.GetNodeInfo(existing_node_name)
2259
2260       if self.op.readd and node == existing_node_name:
2261         if (existing_node.primary_ip != primary_ip or
2262             existing_node.secondary_ip != secondary_ip):
2263           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2264                                      " address configuration as before")
2265         continue
2266
2267       if (existing_node.primary_ip == primary_ip or
2268           existing_node.secondary_ip == primary_ip or
2269           existing_node.primary_ip == secondary_ip or
2270           existing_node.secondary_ip == secondary_ip):
2271         raise errors.OpPrereqError("New node ip address(es) conflict with"
2272                                    " existing node %s" % existing_node.name)
2273
2274     # check that the type of the node (single versus dual homed) is the
2275     # same as for the master
2276     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2277     master_singlehomed = myself.secondary_ip == myself.primary_ip
2278     newbie_singlehomed = secondary_ip == primary_ip
2279     if master_singlehomed != newbie_singlehomed:
2280       if master_singlehomed:
2281         raise errors.OpPrereqError("The master has no private ip but the"
2282                                    " new node has one")
2283       else:
2284         raise errors.OpPrereqError("The master has a private ip but the"
2285                                    " new node doesn't have one")
2286
2287     # checks reachablity
2288     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2289       raise errors.OpPrereqError("Node not reachable by ping")
2290
2291     if not newbie_singlehomed:
2292       # check reachability from my secondary ip to newbie's secondary ip
2293       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2294                            source=myself.secondary_ip):
2295         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2296                                    " based ping to noded port")
2297
2298     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2299     mc_now, _ = self.cfg.GetMasterCandidateStats()
2300     master_candidate = mc_now < cp_size
2301
2302     self.new_node = objects.Node(name=node,
2303                                  primary_ip=primary_ip,
2304                                  secondary_ip=secondary_ip,
2305                                  master_candidate=master_candidate,
2306                                  offline=False, drained=False)
2307
2308   def Exec(self, feedback_fn):
2309     """Adds the new node to the cluster.
2310
2311     """
2312     new_node = self.new_node
2313     node = new_node.name
2314
2315     # check connectivity
2316     result = self.rpc.call_version([node])[node]
2317     result.Raise("Can't get version information from node %s" % node)
2318     if constants.PROTOCOL_VERSION == result.payload:
2319       logging.info("Communication to node %s fine, sw version %s match",
2320                    node, result.payload)
2321     else:
2322       raise errors.OpExecError("Version mismatch master version %s,"
2323                                " node version %s" %
2324                                (constants.PROTOCOL_VERSION, result.payload))
2325
2326     # setup ssh on node
2327     logging.info("Copy ssh key to node %s", node)
2328     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2329     keyarray = []
2330     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2331                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2332                 priv_key, pub_key]
2333
2334     for i in keyfiles:
2335       f = open(i, 'r')
2336       try:
2337         keyarray.append(f.read())
2338       finally:
2339         f.close()
2340
2341     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2342                                     keyarray[2],
2343                                     keyarray[3], keyarray[4], keyarray[5])
2344     result.Raise("Cannot transfer ssh keys to the new node")
2345
2346     # Add node to our /etc/hosts, and add key to known_hosts
2347     if self.cfg.GetClusterInfo().modify_etc_hosts:
2348       utils.AddHostToEtcHosts(new_node.name)
2349
2350     if new_node.secondary_ip != new_node.primary_ip:
2351       result = self.rpc.call_node_has_ip_address(new_node.name,
2352                                                  new_node.secondary_ip)
2353       result.Raise("Failure checking secondary ip on node %s" % new_node.name,
2354                    prereq=True)
2355       if not result.payload:
2356         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2357                                  " you gave (%s). Please fix and re-run this"
2358                                  " command." % new_node.secondary_ip)
2359
2360     node_verify_list = [self.cfg.GetMasterNode()]
2361     node_verify_param = {
2362       'nodelist': [node],
2363       # TODO: do a node-net-test as well?
2364     }
2365
2366     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2367                                        self.cfg.GetClusterName())
2368     for verifier in node_verify_list:
2369       result[verifier].Raise("Cannot communicate with node %s" % verifier)
2370       nl_payload = result[verifier].payload['nodelist']
2371       if nl_payload:
2372         for failed in nl_payload:
2373           feedback_fn("ssh/hostname verification failed %s -> %s" %
2374                       (verifier, nl_payload[failed]))
2375         raise errors.OpExecError("ssh/hostname verification failed.")
2376
2377     if self.op.readd:
2378       _RedistributeAncillaryFiles(self)
2379       self.context.ReaddNode(new_node)
2380     else:
2381       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2382       self.context.AddNode(new_node)
2383
2384
2385 class LUSetNodeParams(LogicalUnit):
2386   """Modifies the parameters of a node.
2387
2388   """
2389   HPATH = "node-modify"
2390   HTYPE = constants.HTYPE_NODE
2391   _OP_REQP = ["node_name"]
2392   REQ_BGL = False
2393
2394   def CheckArguments(self):
2395     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2396     if node_name is None:
2397       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2398     self.op.node_name = node_name
2399     _CheckBooleanOpField(self.op, 'master_candidate')
2400     _CheckBooleanOpField(self.op, 'offline')
2401     _CheckBooleanOpField(self.op, 'drained')
2402     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2403     if all_mods.count(None) == 3:
2404       raise errors.OpPrereqError("Please pass at least one modification")
2405     if all_mods.count(True) > 1:
2406       raise errors.OpPrereqError("Can't set the node into more than one"
2407                                  " state at the same time")
2408
2409   def ExpandNames(self):
2410     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2411
2412   def BuildHooksEnv(self):
2413     """Build hooks env.
2414
2415     This runs on the master node.
2416
2417     """
2418     env = {
2419       "OP_TARGET": self.op.node_name,
2420       "MASTER_CANDIDATE": str(self.op.master_candidate),
2421       "OFFLINE": str(self.op.offline),
2422       "DRAINED": str(self.op.drained),
2423       }
2424     nl = [self.cfg.GetMasterNode(),
2425           self.op.node_name]
2426     return env, nl, nl
2427
2428   def CheckPrereq(self):
2429     """Check prerequisites.
2430
2431     This only checks the instance list against the existing names.
2432
2433     """
2434     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2435
2436     if ((self.op.master_candidate == False or self.op.offline == True or
2437          self.op.drained == True) and node.master_candidate):
2438       # we will demote the node from master_candidate
2439       if self.op.node_name == self.cfg.GetMasterNode():
2440         raise errors.OpPrereqError("The master node has to be a"
2441                                    " master candidate, online and not drained")
2442       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2443       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2444       if num_candidates <= cp_size:
2445         msg = ("Not enough master candidates (desired"
2446                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2447         if self.op.force:
2448           self.LogWarning(msg)
2449         else:
2450           raise errors.OpPrereqError(msg)
2451
2452     if (self.op.master_candidate == True and
2453         ((node.offline and not self.op.offline == False) or
2454          (node.drained and not self.op.drained == False))):
2455       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2456                                  " to master_candidate" % node.name)
2457
2458     return
2459
2460   def Exec(self, feedback_fn):
2461     """Modifies a node.
2462
2463     """
2464     node = self.node
2465
2466     result = []
2467     changed_mc = False
2468
2469     if self.op.offline is not None:
2470       node.offline = self.op.offline
2471       result.append(("offline", str(self.op.offline)))
2472       if self.op.offline == True:
2473         if node.master_candidate:
2474           node.master_candidate = False
2475           changed_mc = True
2476           result.append(("master_candidate", "auto-demotion due to offline"))
2477         if node.drained:
2478           node.drained = False
2479           result.append(("drained", "clear drained status due to offline"))
2480
2481     if self.op.master_candidate is not None:
2482       node.master_candidate = self.op.master_candidate
2483       changed_mc = True
2484       result.append(("master_candidate", str(self.op.master_candidate)))
2485       if self.op.master_candidate == False:
2486         rrc = self.rpc.call_node_demote_from_mc(node.name)
2487         msg = rrc.fail_msg
2488         if msg:
2489           self.LogWarning("Node failed to demote itself: %s" % msg)
2490
2491     if self.op.drained is not None:
2492       node.drained = self.op.drained
2493       result.append(("drained", str(self.op.drained)))
2494       if self.op.drained == True:
2495         if node.master_candidate:
2496           node.master_candidate = False
2497           changed_mc = True
2498           result.append(("master_candidate", "auto-demotion due to drain"))
2499         if node.offline:
2500           node.offline = False
2501           result.append(("offline", "clear offline status due to drain"))
2502
2503     # this will trigger configuration file update, if needed
2504     self.cfg.Update(node)
2505     # this will trigger job queue propagation or cleanup
2506     if changed_mc:
2507       self.context.ReaddNode(node)
2508
2509     return result
2510
2511
2512 class LUPowercycleNode(NoHooksLU):
2513   """Powercycles a node.
2514
2515   """
2516   _OP_REQP = ["node_name", "force"]
2517   REQ_BGL = False
2518
2519   def CheckArguments(self):
2520     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2521     if node_name is None:
2522       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2523     self.op.node_name = node_name
2524     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2525       raise errors.OpPrereqError("The node is the master and the force"
2526                                  " parameter was not set")
2527
2528   def ExpandNames(self):
2529     """Locking for PowercycleNode.
2530
2531     This is a last-resource option and shouldn't block on other
2532     jobs. Therefore, we grab no locks.
2533
2534     """
2535     self.needed_locks = {}
2536
2537   def CheckPrereq(self):
2538     """Check prerequisites.
2539
2540     This LU has no prereqs.
2541
2542     """
2543     pass
2544
2545   def Exec(self, feedback_fn):
2546     """Reboots a node.
2547
2548     """
2549     result = self.rpc.call_node_powercycle(self.op.node_name,
2550                                            self.cfg.GetHypervisorType())
2551     result.Raise("Failed to schedule the reboot")
2552     return result.payload
2553
2554
2555 class LUQueryClusterInfo(NoHooksLU):
2556   """Query cluster configuration.
2557
2558   """
2559   _OP_REQP = []
2560   REQ_BGL = False
2561
2562   def ExpandNames(self):
2563     self.needed_locks = {}
2564
2565   def CheckPrereq(self):
2566     """No prerequsites needed for this LU.
2567
2568     """
2569     pass
2570
2571   def Exec(self, feedback_fn):
2572     """Return cluster config.
2573
2574     """
2575     cluster = self.cfg.GetClusterInfo()
2576     result = {
2577       "software_version": constants.RELEASE_VERSION,
2578       "protocol_version": constants.PROTOCOL_VERSION,
2579       "config_version": constants.CONFIG_VERSION,
2580       "os_api_version": max(constants.OS_API_VERSIONS),
2581       "export_version": constants.EXPORT_VERSION,
2582       "architecture": (platform.architecture()[0], platform.machine()),
2583       "name": cluster.cluster_name,
2584       "master": cluster.master_node,
2585       "default_hypervisor": cluster.default_hypervisor,
2586       "enabled_hypervisors": cluster.enabled_hypervisors,
2587       "hvparams": dict([(hvname, cluster.hvparams[hvname])
2588                         for hvname in cluster.enabled_hypervisors]),
2589       "beparams": cluster.beparams,
2590       "nicparams": cluster.nicparams,
2591       "candidate_pool_size": cluster.candidate_pool_size,
2592       "master_netdev": cluster.master_netdev,
2593       "volume_group_name": cluster.volume_group_name,
2594       "file_storage_dir": cluster.file_storage_dir,
2595       }
2596
2597     return result
2598
2599
2600 class LUQueryConfigValues(NoHooksLU):
2601   """Return configuration values.
2602
2603   """
2604   _OP_REQP = []
2605   REQ_BGL = False
2606   _FIELDS_DYNAMIC = utils.FieldSet()
2607   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2608
2609   def ExpandNames(self):
2610     self.needed_locks = {}
2611
2612     _CheckOutputFields(static=self._FIELDS_STATIC,
2613                        dynamic=self._FIELDS_DYNAMIC,
2614                        selected=self.op.output_fields)
2615
2616   def CheckPrereq(self):
2617     """No prerequisites.
2618
2619     """
2620     pass
2621
2622   def Exec(self, feedback_fn):
2623     """Dump a representation of the cluster config to the standard output.
2624
2625     """
2626     values = []
2627     for field in self.op.output_fields:
2628       if field == "cluster_name":
2629         entry = self.cfg.GetClusterName()
2630       elif field == "master_node":
2631         entry = self.cfg.GetMasterNode()
2632       elif field == "drain_flag":
2633         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2634       else:
2635         raise errors.ParameterError(field)
2636       values.append(entry)
2637     return values
2638
2639
2640 class LUActivateInstanceDisks(NoHooksLU):
2641   """Bring up an instance's disks.
2642
2643   """
2644   _OP_REQP = ["instance_name"]
2645   REQ_BGL = False
2646
2647   def ExpandNames(self):
2648     self._ExpandAndLockInstance()
2649     self.needed_locks[locking.LEVEL_NODE] = []
2650     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2651
2652   def DeclareLocks(self, level):
2653     if level == locking.LEVEL_NODE:
2654       self._LockInstancesNodes()
2655
2656   def CheckPrereq(self):
2657     """Check prerequisites.
2658
2659     This checks that the instance is in the cluster.
2660
2661     """
2662     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2663     assert self.instance is not None, \
2664       "Cannot retrieve locked instance %s" % self.op.instance_name
2665     _CheckNodeOnline(self, self.instance.primary_node)
2666
2667   def Exec(self, feedback_fn):
2668     """Activate the disks.
2669
2670     """
2671     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2672     if not disks_ok:
2673       raise errors.OpExecError("Cannot activate block devices")
2674
2675     return disks_info
2676
2677
2678 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2679   """Prepare the block devices for an instance.
2680
2681   This sets up the block devices on all nodes.
2682
2683   @type lu: L{LogicalUnit}
2684   @param lu: the logical unit on whose behalf we execute
2685   @type instance: L{objects.Instance}
2686   @param instance: the instance for whose disks we assemble
2687   @type ignore_secondaries: boolean
2688   @param ignore_secondaries: if true, errors on secondary nodes
2689       won't result in an error return from the function
2690   @return: False if the operation failed, otherwise a list of
2691       (host, instance_visible_name, node_visible_name)
2692       with the mapping from node devices to instance devices
2693
2694   """
2695   device_info = []
2696   disks_ok = True
2697   iname = instance.name
2698   # With the two passes mechanism we try to reduce the window of
2699   # opportunity for the race condition of switching DRBD to primary
2700   # before handshaking occured, but we do not eliminate it
2701
2702   # The proper fix would be to wait (with some limits) until the
2703   # connection has been made and drbd transitions from WFConnection
2704   # into any other network-connected state (Connected, SyncTarget,
2705   # SyncSource, etc.)
2706
2707   # 1st pass, assemble on all nodes in secondary mode
2708   for inst_disk in instance.disks:
2709     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2710       lu.cfg.SetDiskID(node_disk, node)
2711       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2712       msg = result.fail_msg
2713       if msg:
2714         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2715                            " (is_primary=False, pass=1): %s",
2716                            inst_disk.iv_name, node, msg)
2717         if not ignore_secondaries:
2718           disks_ok = False
2719
2720   # FIXME: race condition on drbd migration to primary
2721
2722   # 2nd pass, do only the primary node
2723   for inst_disk in instance.disks:
2724     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2725       if node != instance.primary_node:
2726         continue
2727       lu.cfg.SetDiskID(node_disk, node)
2728       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2729       msg = result.fail_msg
2730       if msg:
2731         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2732                            " (is_primary=True, pass=2): %s",
2733                            inst_disk.iv_name, node, msg)
2734         disks_ok = False
2735     device_info.append((instance.primary_node, inst_disk.iv_name,
2736                         result.payload))
2737
2738   # leave the disks configured for the primary node
2739   # this is a workaround that would be fixed better by
2740   # improving the logical/physical id handling
2741   for disk in instance.disks:
2742     lu.cfg.SetDiskID(disk, instance.primary_node)
2743
2744   return disks_ok, device_info
2745
2746
2747 def _StartInstanceDisks(lu, instance, force):
2748   """Start the disks of an instance.
2749
2750   """
2751   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2752                                            ignore_secondaries=force)
2753   if not disks_ok:
2754     _ShutdownInstanceDisks(lu, instance)
2755     if force is not None and not force:
2756       lu.proc.LogWarning("", hint="If the message above refers to a"
2757                          " secondary node,"
2758                          " you can retry the operation using '--force'.")
2759     raise errors.OpExecError("Disk consistency error")
2760
2761
2762 class LUDeactivateInstanceDisks(NoHooksLU):
2763   """Shutdown an instance's disks.
2764
2765   """
2766   _OP_REQP = ["instance_name"]
2767   REQ_BGL = False
2768
2769   def ExpandNames(self):
2770     self._ExpandAndLockInstance()
2771     self.needed_locks[locking.LEVEL_NODE] = []
2772     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2773
2774   def DeclareLocks(self, level):
2775     if level == locking.LEVEL_NODE:
2776       self._LockInstancesNodes()
2777
2778   def CheckPrereq(self):
2779     """Check prerequisites.
2780
2781     This checks that the instance is in the cluster.
2782
2783     """
2784     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2785     assert self.instance is not None, \
2786       "Cannot retrieve locked instance %s" % self.op.instance_name
2787
2788   def Exec(self, feedback_fn):
2789     """Deactivate the disks
2790
2791     """
2792     instance = self.instance
2793     _SafeShutdownInstanceDisks(self, instance)
2794
2795
2796 def _SafeShutdownInstanceDisks(lu, instance):
2797   """Shutdown block devices of an instance.
2798
2799   This function checks if an instance is running, before calling
2800   _ShutdownInstanceDisks.
2801
2802   """
2803   pnode = instance.primary_node
2804   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
2805   ins_l.Raise("Can't contact node %s" % pnode)
2806
2807   if instance.name in ins_l.payload:
2808     raise errors.OpExecError("Instance is running, can't shutdown"
2809                              " block devices.")
2810
2811   _ShutdownInstanceDisks(lu, instance)
2812
2813
2814 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2815   """Shutdown block devices of an instance.
2816
2817   This does the shutdown on all nodes of the instance.
2818
2819   If the ignore_primary is false, errors on the primary node are
2820   ignored.
2821
2822   """
2823   all_result = True
2824   for disk in instance.disks:
2825     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2826       lu.cfg.SetDiskID(top_disk, node)
2827       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2828       msg = result.fail_msg
2829       if msg:
2830         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2831                       disk.iv_name, node, msg)
2832         if not ignore_primary or node != instance.primary_node:
2833           all_result = False
2834   return all_result
2835
2836
2837 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2838   """Checks if a node has enough free memory.
2839
2840   This function check if a given node has the needed amount of free
2841   memory. In case the node has less memory or we cannot get the
2842   information from the node, this function raise an OpPrereqError
2843   exception.
2844
2845   @type lu: C{LogicalUnit}
2846   @param lu: a logical unit from which we get configuration data
2847   @type node: C{str}
2848   @param node: the node to check
2849   @type reason: C{str}
2850   @param reason: string to use in the error message
2851   @type requested: C{int}
2852   @param requested: the amount of memory in MiB to check for
2853   @type hypervisor_name: C{str}
2854   @param hypervisor_name: the hypervisor to ask for memory stats
2855   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2856       we cannot check the node
2857
2858   """
2859   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2860   nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True)
2861   free_mem = nodeinfo[node].payload.get('memory_free', None)
2862   if not isinstance(free_mem, int):
2863     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2864                                " was '%s'" % (node, free_mem))
2865   if requested > free_mem:
2866     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2867                                " needed %s MiB, available %s MiB" %
2868                                (node, reason, requested, free_mem))
2869
2870
2871 class LUStartupInstance(LogicalUnit):
2872   """Starts an instance.
2873
2874   """
2875   HPATH = "instance-start"
2876   HTYPE = constants.HTYPE_INSTANCE
2877   _OP_REQP = ["instance_name", "force"]
2878   REQ_BGL = False
2879
2880   def ExpandNames(self):
2881     self._ExpandAndLockInstance()
2882
2883   def BuildHooksEnv(self):
2884     """Build hooks env.
2885
2886     This runs on master, primary and secondary nodes of the instance.
2887
2888     """
2889     env = {
2890       "FORCE": self.op.force,
2891       }
2892     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2893     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2894     return env, nl, nl
2895
2896   def CheckPrereq(self):
2897     """Check prerequisites.
2898
2899     This checks that the instance is in the cluster.
2900
2901     """
2902     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2903     assert self.instance is not None, \
2904       "Cannot retrieve locked instance %s" % self.op.instance_name
2905
2906     # extra beparams
2907     self.beparams = getattr(self.op, "beparams", {})
2908     if self.beparams:
2909       if not isinstance(self.beparams, dict):
2910         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2911                                    " dict" % (type(self.beparams), ))
2912       # fill the beparams dict
2913       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2914       self.op.beparams = self.beparams
2915
2916     # extra hvparams
2917     self.hvparams = getattr(self.op, "hvparams", {})
2918     if self.hvparams:
2919       if not isinstance(self.hvparams, dict):
2920         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2921                                    " dict" % (type(self.hvparams), ))
2922
2923       # check hypervisor parameter syntax (locally)
2924       cluster = self.cfg.GetClusterInfo()
2925       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2926       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2927                                     instance.hvparams)
2928       filled_hvp.update(self.hvparams)
2929       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2930       hv_type.CheckParameterSyntax(filled_hvp)
2931       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2932       self.op.hvparams = self.hvparams
2933
2934     _CheckNodeOnline(self, instance.primary_node)
2935
2936     bep = self.cfg.GetClusterInfo().FillBE(instance)
2937     # check bridges existance
2938     _CheckInstanceBridgesExist(self, instance)
2939
2940     remote_info = self.rpc.call_instance_info(instance.primary_node,
2941                                               instance.name,
2942                                               instance.hypervisor)
2943     remote_info.Raise("Error checking node %s" % instance.primary_node,
2944                       prereq=True)
2945     if not remote_info.payload: # not running already
2946       _CheckNodeFreeMemory(self, instance.primary_node,
2947                            "starting instance %s" % instance.name,
2948                            bep[constants.BE_MEMORY], instance.hypervisor)
2949
2950   def Exec(self, feedback_fn):
2951     """Start the instance.
2952
2953     """
2954     instance = self.instance
2955     force = self.op.force
2956
2957     self.cfg.MarkInstanceUp(instance.name)
2958
2959     node_current = instance.primary_node
2960
2961     _StartInstanceDisks(self, instance, force)
2962
2963     result = self.rpc.call_instance_start(node_current, instance,
2964                                           self.hvparams, self.beparams)
2965     msg = result.fail_msg
2966     if msg:
2967       _ShutdownInstanceDisks(self, instance)
2968       raise errors.OpExecError("Could not start instance: %s" % msg)
2969
2970
2971 class LURebootInstance(LogicalUnit):
2972   """Reboot an instance.
2973
2974   """
2975   HPATH = "instance-reboot"
2976   HTYPE = constants.HTYPE_INSTANCE
2977   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2978   REQ_BGL = False
2979
2980   def ExpandNames(self):
2981     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2982                                    constants.INSTANCE_REBOOT_HARD,
2983                                    constants.INSTANCE_REBOOT_FULL]:
2984       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2985                                   (constants.INSTANCE_REBOOT_SOFT,
2986                                    constants.INSTANCE_REBOOT_HARD,
2987                                    constants.INSTANCE_REBOOT_FULL))
2988     self._ExpandAndLockInstance()
2989
2990   def BuildHooksEnv(self):
2991     """Build hooks env.
2992
2993     This runs on master, primary and secondary nodes of the instance.
2994
2995     """
2996     env = {
2997       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2998       "REBOOT_TYPE": self.op.reboot_type,
2999       }
3000     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3001     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3002     return env, nl, nl
3003
3004   def CheckPrereq(self):
3005     """Check prerequisites.
3006
3007     This checks that the instance is in the cluster.
3008
3009     """
3010     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3011     assert self.instance is not None, \
3012       "Cannot retrieve locked instance %s" % self.op.instance_name
3013
3014     _CheckNodeOnline(self, instance.primary_node)
3015
3016     # check bridges existance
3017     _CheckInstanceBridgesExist(self, instance)
3018
3019   def Exec(self, feedback_fn):
3020     """Reboot the instance.
3021
3022     """
3023     instance = self.instance
3024     ignore_secondaries = self.op.ignore_secondaries
3025     reboot_type = self.op.reboot_type
3026
3027     node_current = instance.primary_node
3028
3029     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3030                        constants.INSTANCE_REBOOT_HARD]:
3031       for disk in instance.disks:
3032         self.cfg.SetDiskID(disk, node_current)
3033       result = self.rpc.call_instance_reboot(node_current, instance,
3034                                              reboot_type)
3035       result.Raise("Could not reboot instance")
3036     else:
3037       result = self.rpc.call_instance_shutdown(node_current, instance)
3038       result.Raise("Could not shutdown instance for full reboot")
3039       _ShutdownInstanceDisks(self, instance)
3040       _StartInstanceDisks(self, instance, ignore_secondaries)
3041       result = self.rpc.call_instance_start(node_current, instance, None, None)
3042       msg = result.fail_msg
3043       if msg:
3044         _ShutdownInstanceDisks(self, instance)
3045         raise errors.OpExecError("Could not start instance for"
3046                                  " full reboot: %s" % msg)
3047
3048     self.cfg.MarkInstanceUp(instance.name)
3049
3050
3051 class LUShutdownInstance(LogicalUnit):
3052   """Shutdown an instance.
3053
3054   """
3055   HPATH = "instance-stop"
3056   HTYPE = constants.HTYPE_INSTANCE
3057   _OP_REQP = ["instance_name"]
3058   REQ_BGL = False
3059
3060   def ExpandNames(self):
3061     self._ExpandAndLockInstance()
3062
3063   def BuildHooksEnv(self):
3064     """Build hooks env.
3065
3066     This runs on master, primary and secondary nodes of the instance.
3067
3068     """
3069     env = _BuildInstanceHookEnvByObject(self, self.instance)
3070     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3071     return env, nl, nl
3072
3073   def CheckPrereq(self):
3074     """Check prerequisites.
3075
3076     This checks that the instance is in the cluster.
3077
3078     """
3079     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3080     assert self.instance is not None, \
3081       "Cannot retrieve locked instance %s" % self.op.instance_name
3082     _CheckNodeOnline(self, self.instance.primary_node)
3083
3084   def Exec(self, feedback_fn):
3085     """Shutdown the instance.
3086
3087     """
3088     instance = self.instance
3089     node_current = instance.primary_node
3090     self.cfg.MarkInstanceDown(instance.name)
3091     result = self.rpc.call_instance_shutdown(node_current, instance)
3092     msg = result.fail_msg
3093     if msg:
3094       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3095
3096     _ShutdownInstanceDisks(self, instance)
3097
3098
3099 class LUReinstallInstance(LogicalUnit):
3100   """Reinstall an instance.
3101
3102   """
3103   HPATH = "instance-reinstall"
3104   HTYPE = constants.HTYPE_INSTANCE
3105   _OP_REQP = ["instance_name"]
3106   REQ_BGL = False
3107
3108   def ExpandNames(self):
3109     self._ExpandAndLockInstance()
3110
3111   def BuildHooksEnv(self):
3112     """Build hooks env.
3113
3114     This runs on master, primary and secondary nodes of the instance.
3115
3116     """
3117     env = _BuildInstanceHookEnvByObject(self, self.instance)
3118     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3119     return env, nl, nl
3120
3121   def CheckPrereq(self):
3122     """Check prerequisites.
3123
3124     This checks that the instance is in the cluster and is not running.
3125
3126     """
3127     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3128     assert instance is not None, \
3129       "Cannot retrieve locked instance %s" % self.op.instance_name
3130     _CheckNodeOnline(self, instance.primary_node)
3131
3132     if instance.disk_template == constants.DT_DISKLESS:
3133       raise errors.OpPrereqError("Instance '%s' has no disks" %
3134                                  self.op.instance_name)
3135     if instance.admin_up:
3136       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3137                                  self.op.instance_name)
3138     remote_info = self.rpc.call_instance_info(instance.primary_node,
3139                                               instance.name,
3140                                               instance.hypervisor)
3141     remote_info.Raise("Error checking node %s" % instance.primary_node,
3142                       prereq=True)
3143     if remote_info.payload:
3144       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3145                                  (self.op.instance_name,
3146                                   instance.primary_node))
3147
3148     self.op.os_type = getattr(self.op, "os_type", None)
3149     if self.op.os_type is not None:
3150       # OS verification
3151       pnode = self.cfg.GetNodeInfo(
3152         self.cfg.ExpandNodeName(instance.primary_node))
3153       if pnode is None:
3154         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3155                                    self.op.pnode)
3156       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3157       result.Raise("OS '%s' not in supported OS list for primary node %s" %
3158                    (self.op.os_type, pnode.name), prereq=True)
3159
3160     self.instance = instance
3161
3162   def Exec(self, feedback_fn):
3163     """Reinstall the instance.
3164
3165     """
3166     inst = self.instance
3167
3168     if self.op.os_type is not None:
3169       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3170       inst.os = self.op.os_type
3171       self.cfg.Update(inst)
3172
3173     _StartInstanceDisks(self, inst, None)
3174     try:
3175       feedback_fn("Running the instance OS create scripts...")
3176       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3177       result.Raise("Could not install OS for instance %s on node %s" %
3178                    (inst.name, inst.primary_node))
3179     finally:
3180       _ShutdownInstanceDisks(self, inst)
3181
3182
3183 class LURenameInstance(LogicalUnit):
3184   """Rename an instance.
3185
3186   """
3187   HPATH = "instance-rename"
3188   HTYPE = constants.HTYPE_INSTANCE
3189   _OP_REQP = ["instance_name", "new_name"]
3190
3191   def BuildHooksEnv(self):
3192     """Build hooks env.
3193
3194     This runs on master, primary and secondary nodes of the instance.
3195
3196     """
3197     env = _BuildInstanceHookEnvByObject(self, self.instance)
3198     env["INSTANCE_NEW_NAME"] = self.op.new_name
3199     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3200     return env, nl, nl
3201
3202   def CheckPrereq(self):
3203     """Check prerequisites.
3204
3205     This checks that the instance is in the cluster and is not running.
3206
3207     """
3208     instance = self.cfg.GetInstanceInfo(
3209       self.cfg.ExpandInstanceName(self.op.instance_name))
3210     if instance is None:
3211       raise errors.OpPrereqError("Instance '%s' not known" %
3212                                  self.op.instance_name)
3213     _CheckNodeOnline(self, instance.primary_node)
3214
3215     if instance.admin_up:
3216       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3217                                  self.op.instance_name)
3218     remote_info = self.rpc.call_instance_info(instance.primary_node,
3219                                               instance.name,
3220                                               instance.hypervisor)
3221     remote_info.Raise("Error checking node %s" % instance.primary_node,
3222                       prereq=True)
3223     if remote_info.payload:
3224       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3225                                  (self.op.instance_name,
3226                                   instance.primary_node))
3227     self.instance = instance
3228
3229     # new name verification
3230     name_info = utils.HostInfo(self.op.new_name)
3231
3232     self.op.new_name = new_name = name_info.name
3233     instance_list = self.cfg.GetInstanceList()
3234     if new_name in instance_list:
3235       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3236                                  new_name)
3237
3238     if not getattr(self.op, "ignore_ip", False):
3239       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3240         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3241                                    (name_info.ip, new_name))
3242
3243
3244   def Exec(self, feedback_fn):
3245     """Reinstall the instance.
3246
3247     """
3248     inst = self.instance
3249     old_name = inst.name
3250
3251     if inst.disk_template == constants.DT_FILE:
3252       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3253
3254     self.cfg.RenameInstance(inst.name, self.op.new_name)
3255     # Change the instance lock. This is definitely safe while we hold the BGL
3256     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3257     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3258
3259     # re-read the instance from the configuration after rename
3260     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3261
3262     if inst.disk_template == constants.DT_FILE:
3263       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3264       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3265                                                      old_file_storage_dir,
3266                                                      new_file_storage_dir)
3267       result.Raise("Could not rename on node %s directory '%s' to '%s'"
3268                    " (but the instance has been renamed in Ganeti)" %
3269                    (inst.primary_node, old_file_storage_dir,
3270                     new_file_storage_dir))
3271
3272     _StartInstanceDisks(self, inst, None)
3273     try:
3274       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3275                                                  old_name)
3276       msg = result.fail_msg
3277       if msg:
3278         msg = ("Could not run OS rename script for instance %s on node %s"
3279                " (but the instance has been renamed in Ganeti): %s" %
3280                (inst.name, inst.primary_node, msg))
3281         self.proc.LogWarning(msg)
3282     finally:
3283       _ShutdownInstanceDisks(self, inst)
3284
3285
3286 class LURemoveInstance(LogicalUnit):
3287   """Remove an instance.
3288
3289   """
3290   HPATH = "instance-remove"
3291   HTYPE = constants.HTYPE_INSTANCE
3292   _OP_REQP = ["instance_name", "ignore_failures"]
3293   REQ_BGL = False
3294
3295   def ExpandNames(self):
3296     self._ExpandAndLockInstance()
3297     self.needed_locks[locking.LEVEL_NODE] = []
3298     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3299
3300   def DeclareLocks(self, level):
3301     if level == locking.LEVEL_NODE:
3302       self._LockInstancesNodes()
3303
3304   def BuildHooksEnv(self):
3305     """Build hooks env.
3306
3307     This runs on master, primary and secondary nodes of the instance.
3308
3309     """
3310     env = _BuildInstanceHookEnvByObject(self, self.instance)
3311     nl = [self.cfg.GetMasterNode()]
3312     return env, nl, nl
3313
3314   def CheckPrereq(self):
3315     """Check prerequisites.
3316
3317     This checks that the instance is in the cluster.
3318
3319     """
3320     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3321     assert self.instance is not None, \
3322       "Cannot retrieve locked instance %s" % self.op.instance_name
3323
3324   def Exec(self, feedback_fn):
3325     """Remove the instance.
3326
3327     """
3328     instance = self.instance
3329     logging.info("Shutting down instance %s on node %s",
3330                  instance.name, instance.primary_node)
3331
3332     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3333     msg = result.fail_msg
3334     if msg:
3335       if self.op.ignore_failures:
3336         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3337       else:
3338         raise errors.OpExecError("Could not shutdown instance %s on"
3339                                  " node %s: %s" %
3340                                  (instance.name, instance.primary_node, msg))
3341
3342     logging.info("Removing block devices for instance %s", instance.name)
3343
3344     if not _RemoveDisks(self, instance):
3345       if self.op.ignore_failures:
3346         feedback_fn("Warning: can't remove instance's disks")
3347       else:
3348         raise errors.OpExecError("Can't remove instance's disks")
3349
3350     logging.info("Removing instance %s out of cluster config", instance.name)
3351
3352     self.cfg.RemoveInstance(instance.name)
3353     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3354
3355
3356 class LUQueryInstances(NoHooksLU):
3357   """Logical unit for querying instances.
3358
3359   """
3360   _OP_REQP = ["output_fields", "names", "use_locking"]
3361   REQ_BGL = False
3362   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3363                                     "admin_state",
3364                                     "disk_template", "ip", "mac", "bridge",
3365                                     "nic_mode", "nic_link",
3366                                     "sda_size", "sdb_size", "vcpus", "tags",
3367                                     "network_port", "beparams",
3368                                     r"(disk)\.(size)/([0-9]+)",
3369                                     r"(disk)\.(sizes)", "disk_usage",
3370                                     r"(nic)\.(mac|ip|mode|link)/([0-9]+)",
3371                                     r"(nic)\.(bridge)/([0-9]+)",
3372                                     r"(nic)\.(macs|ips|modes|links|bridges)",
3373                                     r"(disk|nic)\.(count)",
3374                                     "serial_no", "hypervisor", "hvparams",] +
3375                                   ["hv/%s" % name
3376                                    for name in constants.HVS_PARAMETERS] +
3377                                   ["be/%s" % name
3378                                    for name in constants.BES_PARAMETERS])
3379   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3380
3381
3382   def ExpandNames(self):
3383     _CheckOutputFields(static=self._FIELDS_STATIC,
3384                        dynamic=self._FIELDS_DYNAMIC,
3385                        selected=self.op.output_fields)
3386
3387     self.needed_locks = {}
3388     self.share_locks[locking.LEVEL_INSTANCE] = 1
3389     self.share_locks[locking.LEVEL_NODE] = 1
3390
3391     if self.op.names:
3392       self.wanted = _GetWantedInstances(self, self.op.names)
3393     else:
3394       self.wanted = locking.ALL_SET
3395
3396     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3397     self.do_locking = self.do_node_query and self.op.use_locking
3398     if self.do_locking:
3399       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3400       self.needed_locks[locking.LEVEL_NODE] = []
3401       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3402
3403   def DeclareLocks(self, level):
3404     if level == locking.LEVEL_NODE and self.do_locking:
3405       self._LockInstancesNodes()
3406
3407   def CheckPrereq(self):
3408     """Check prerequisites.
3409
3410     """
3411     pass
3412
3413   def Exec(self, feedback_fn):
3414     """Computes the list of nodes and their attributes.
3415
3416     """
3417     all_info = self.cfg.GetAllInstancesInfo()
3418     if self.wanted == locking.ALL_SET:
3419       # caller didn't specify instance names, so ordering is not important
3420       if self.do_locking:
3421         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3422       else:
3423         instance_names = all_info.keys()
3424       instance_names = utils.NiceSort(instance_names)
3425     else:
3426       # caller did specify names, so we must keep the ordering
3427       if self.do_locking:
3428         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3429       else:
3430         tgt_set = all_info.keys()
3431       missing = set(self.wanted).difference(tgt_set)
3432       if missing:
3433         raise errors.OpExecError("Some instances were removed before"
3434                                  " retrieving their data: %s" % missing)
3435       instance_names = self.wanted
3436
3437     instance_list = [all_info[iname] for iname in instance_names]
3438
3439     # begin data gathering
3440
3441     nodes = frozenset([inst.primary_node for inst in instance_list])
3442     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3443
3444     bad_nodes = []
3445     off_nodes = []
3446     if self.do_node_query:
3447       live_data = {}
3448       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3449       for name in nodes:
3450         result = node_data[name]
3451         if result.offline:
3452           # offline nodes will be in both lists
3453           off_nodes.append(name)
3454         if result.failed or result.fail_msg:
3455           bad_nodes.append(name)
3456         else:
3457           if result.payload:
3458             live_data.update(result.payload)
3459           # else no instance is alive
3460     else:
3461       live_data = dict([(name, {}) for name in instance_names])
3462
3463     # end data gathering
3464
3465     HVPREFIX = "hv/"
3466     BEPREFIX = "be/"
3467     output = []
3468     cluster = self.cfg.GetClusterInfo()
3469     for instance in instance_list:
3470       iout = []
3471       i_hv = cluster.FillHV(instance)
3472       i_be = cluster.FillBE(instance)
3473       i_nicp = [objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
3474                                  nic.nicparams) for nic in instance.nics]
3475       for field in self.op.output_fields:
3476         st_match = self._FIELDS_STATIC.Matches(field)
3477         if field == "name":
3478           val = instance.name
3479         elif field == "os":
3480           val = instance.os
3481         elif field == "pnode":
3482           val = instance.primary_node
3483         elif field == "snodes":
3484           val = list(instance.secondary_nodes)
3485         elif field == "admin_state":
3486           val = instance.admin_up
3487         elif field == "oper_state":
3488           if instance.primary_node in bad_nodes:
3489             val = None
3490           else:
3491             val = bool(live_data.get(instance.name))
3492         elif field == "status":
3493           if instance.primary_node in off_nodes:
3494             val = "ERROR_nodeoffline"
3495           elif instance.primary_node in bad_nodes:
3496             val = "ERROR_nodedown"
3497           else:
3498             running = bool(live_data.get(instance.name))
3499             if running:
3500               if instance.admin_up:
3501                 val = "running"
3502               else:
3503                 val = "ERROR_up"
3504             else:
3505               if instance.admin_up:
3506                 val = "ERROR_down"
3507               else:
3508                 val = "ADMIN_down"
3509         elif field == "oper_ram":
3510           if instance.primary_node in bad_nodes:
3511             val = None
3512           elif instance.name in live_data:
3513             val = live_data[instance.name].get("memory", "?")
3514           else:
3515             val = "-"
3516         elif field == "disk_template":
3517           val = instance.disk_template
3518         elif field == "ip":
3519           if instance.nics:
3520             val = instance.nics[0].ip
3521           else:
3522             val = None
3523         elif field == "nic_mode":
3524           if instance.nics:
3525             val = i_nicp[0][constants.NIC_MODE]
3526           else:
3527             val = None
3528         elif field == "nic_link":
3529           if instance.nics:
3530             val = i_nicp[0][constants.NIC_LINK]
3531           else:
3532             val = None
3533         elif field == "bridge":
3534           if (instance.nics and
3535               i_nicp[0][constants.NIC_MODE] == constants.NIC_MODE_BRIDGED):
3536             val = i_nicp[0][constants.NIC_LINK]
3537           else:
3538             val = None
3539         elif field == "mac":
3540           if instance.nics:
3541             val = instance.nics[0].mac
3542           else:
3543             val = None
3544         elif field == "sda_size" or field == "sdb_size":
3545           idx = ord(field[2]) - ord('a')
3546           try:
3547             val = instance.FindDisk(idx).size
3548           except errors.OpPrereqError:
3549             val = None
3550         elif field == "disk_usage": # total disk usage per node
3551           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3552           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3553         elif field == "tags":
3554           val = list(instance.GetTags())
3555         elif field == "serial_no":
3556           val = instance.serial_no
3557         elif field == "network_port":
3558           val = instance.network_port
3559         elif field == "hypervisor":
3560           val = instance.hypervisor
3561         elif field == "hvparams":
3562           val = i_hv
3563         elif (field.startswith(HVPREFIX) and
3564               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3565           val = i_hv.get(field[len(HVPREFIX):], None)
3566         elif field == "beparams":
3567           val = i_be
3568         elif (field.startswith(BEPREFIX) and
3569               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3570           val = i_be.get(field[len(BEPREFIX):], None)
3571         elif st_match and st_match.groups():
3572           # matches a variable list
3573           st_groups = st_match.groups()
3574           if st_groups and st_groups[0] == "disk":
3575             if st_groups[1] == "count":
3576               val = len(instance.disks)
3577             elif st_groups[1] == "sizes":
3578               val = [disk.size for disk in instance.disks]
3579             elif st_groups[1] == "size":
3580               try:
3581                 val = instance.FindDisk(st_groups[2]).size
3582               except errors.OpPrereqError:
3583                 val = None
3584             else:
3585               assert False, "Unhandled disk parameter"
3586           elif st_groups[0] == "nic":
3587             if st_groups[1] == "count":
3588               val = len(instance.nics)
3589             elif st_groups[1] == "macs":
3590               val = [nic.mac for nic in instance.nics]
3591             elif st_groups[1] == "ips":
3592               val = [nic.ip for nic in instance.nics]
3593             elif st_groups[1] == "modes":
3594               val = [nicp[constants.NIC_MODE] for nicp in i_nicp]
3595             elif st_groups[1] == "links":
3596               val = [nicp[constants.NIC_LINK] for nicp in i_nicp]
3597             elif st_groups[1] == "bridges":
3598               val = []
3599               for nicp in i_nicp:
3600                 if nicp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3601                   val.append(nicp[constants.NIC_LINK])
3602                 else:
3603                   val.append(None)
3604             else:
3605               # index-based item
3606               nic_idx = int(st_groups[2])
3607               if nic_idx >= len(instance.nics):
3608                 val = None
3609               else:
3610                 if st_groups[1] == "mac":
3611                   val = instance.nics[nic_idx].mac
3612                 elif st_groups[1] == "ip":
3613                   val = instance.nics[nic_idx].ip
3614                 elif st_groups[1] == "mode":
3615                   val = i_nicp[nic_idx][constants.NIC_MODE]
3616                 elif st_groups[1] == "link":
3617                   val = i_nicp[nic_idx][constants.NIC_LINK]
3618                 elif st_groups[1] == "bridge":
3619                   nic_mode = i_nicp[nic_idx][constants.NIC_MODE]
3620                   if nic_mode == constants.NIC_MODE_BRIDGED:
3621                     val = i_nicp[nic_idx][constants.NIC_LINK]
3622                   else:
3623                     val = None
3624                 else:
3625                   assert False, "Unhandled NIC parameter"
3626           else:
3627             assert False, "Unhandled variable parameter"
3628         else:
3629           raise errors.ParameterError(field)
3630         iout.append(val)
3631       output.append(iout)
3632
3633     return output
3634
3635
3636 class LUFailoverInstance(LogicalUnit):
3637   """Failover an instance.
3638
3639   """
3640   HPATH = "instance-failover"
3641   HTYPE = constants.HTYPE_INSTANCE
3642   _OP_REQP = ["instance_name", "ignore_consistency"]
3643   REQ_BGL = False
3644
3645   def ExpandNames(self):
3646     self._ExpandAndLockInstance()
3647     self.needed_locks[locking.LEVEL_NODE] = []
3648     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3649
3650   def DeclareLocks(self, level):
3651     if level == locking.LEVEL_NODE:
3652       self._LockInstancesNodes()
3653
3654   def BuildHooksEnv(self):
3655     """Build hooks env.
3656
3657     This runs on master, primary and secondary nodes of the instance.
3658
3659     """
3660     env = {
3661       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3662       }
3663     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3664     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3665     return env, nl, nl
3666
3667   def CheckPrereq(self):
3668     """Check prerequisites.
3669
3670     This checks that the instance is in the cluster.
3671
3672     """
3673     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3674     assert self.instance is not None, \
3675       "Cannot retrieve locked instance %s" % self.op.instance_name
3676
3677     bep = self.cfg.GetClusterInfo().FillBE(instance)
3678     if instance.disk_template not in constants.DTS_NET_MIRROR:
3679       raise errors.OpPrereqError("Instance's disk layout is not"
3680                                  " network mirrored, cannot failover.")
3681
3682     secondary_nodes = instance.secondary_nodes
3683     if not secondary_nodes:
3684       raise errors.ProgrammerError("no secondary node but using "
3685                                    "a mirrored disk template")
3686
3687     target_node = secondary_nodes[0]
3688     _CheckNodeOnline(self, target_node)
3689     _CheckNodeNotDrained(self, target_node)
3690     if instance.admin_up:
3691       # check memory requirements on the secondary node
3692       _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3693                            instance.name, bep[constants.BE_MEMORY],
3694                            instance.hypervisor)
3695     else:
3696       self.LogInfo("Not checking memory on the secondary node as"
3697                    " instance will not be started")
3698
3699     # check bridge existance
3700     _CheckInstanceBridgesExist(self, instance, node=target_node)
3701
3702   def Exec(self, feedback_fn):
3703     """Failover an instance.
3704
3705     The failover is done by shutting it down on its present node and
3706     starting it on the secondary.
3707
3708     """
3709     instance = self.instance
3710
3711     source_node = instance.primary_node
3712     target_node = instance.secondary_nodes[0]
3713
3714     feedback_fn("* checking disk consistency between source and target")
3715     for dev in instance.disks:
3716       # for drbd, these are drbd over lvm
3717       if not _CheckDiskConsistency(self, dev, target_node, False):
3718         if instance.admin_up and not self.op.ignore_consistency:
3719           raise errors.OpExecError("Disk %s is degraded on target node,"
3720                                    " aborting failover." % dev.iv_name)
3721
3722     feedback_fn("* shutting down instance on source node")
3723     logging.info("Shutting down instance %s on node %s",
3724                  instance.name, source_node)
3725
3726     result = self.rpc.call_instance_shutdown(source_node, instance)
3727     msg = result.fail_msg
3728     if msg:
3729       if self.op.ignore_consistency:
3730         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3731                              " Proceeding anyway. Please make sure node"
3732                              " %s is down. Error details: %s",
3733                              instance.name, source_node, source_node, msg)
3734       else:
3735         raise errors.OpExecError("Could not shutdown instance %s on"
3736                                  " node %s: %s" %
3737                                  (instance.name, source_node, msg))
3738
3739     feedback_fn("* deactivating the instance's disks on source node")
3740     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3741       raise errors.OpExecError("Can't shut down the instance's disks.")
3742
3743     instance.primary_node = target_node
3744     # distribute new instance config to the other nodes
3745     self.cfg.Update(instance)
3746
3747     # Only start the instance if it's marked as up
3748     if instance.admin_up:
3749       feedback_fn("* activating the instance's disks on target node")
3750       logging.info("Starting instance %s on node %s",
3751                    instance.name, target_node)
3752
3753       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3754                                                ignore_secondaries=True)
3755       if not disks_ok:
3756         _ShutdownInstanceDisks(self, instance)
3757         raise errors.OpExecError("Can't activate the instance's disks")
3758
3759       feedback_fn("* starting the instance on the target node")
3760       result = self.rpc.call_instance_start(target_node, instance, None, None)
3761       msg = result.fail_msg
3762       if msg:
3763         _ShutdownInstanceDisks(self, instance)
3764         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3765                                  (instance.name, target_node, msg))
3766
3767
3768 class LUMigrateInstance(LogicalUnit):
3769   """Migrate an instance.
3770
3771   This is migration without shutting down, compared to the failover,
3772   which is done with shutdown.
3773
3774   """
3775   HPATH = "instance-migrate"
3776   HTYPE = constants.HTYPE_INSTANCE
3777   _OP_REQP = ["instance_name", "live", "cleanup"]
3778
3779   REQ_BGL = False
3780
3781   def ExpandNames(self):
3782     self._ExpandAndLockInstance()
3783     self.needed_locks[locking.LEVEL_NODE] = []
3784     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3785
3786   def DeclareLocks(self, level):
3787     if level == locking.LEVEL_NODE:
3788       self._LockInstancesNodes()
3789
3790   def BuildHooksEnv(self):
3791     """Build hooks env.
3792
3793     This runs on master, primary and secondary nodes of the instance.
3794
3795     """
3796     env = _BuildInstanceHookEnvByObject(self, self.instance)
3797     env["MIGRATE_LIVE"] = self.op.live
3798     env["MIGRATE_CLEANUP"] = self.op.cleanup
3799     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3800     return env, nl, nl
3801
3802   def CheckPrereq(self):
3803     """Check prerequisites.
3804
3805     This checks that the instance is in the cluster.
3806
3807     """
3808     instance = self.cfg.GetInstanceInfo(
3809       self.cfg.ExpandInstanceName(self.op.instance_name))
3810     if instance is None:
3811       raise errors.OpPrereqError("Instance '%s' not known" %
3812                                  self.op.instance_name)
3813
3814     if instance.disk_template != constants.DT_DRBD8:
3815       raise errors.OpPrereqError("Instance's disk layout is not"
3816                                  " drbd8, cannot migrate.")
3817
3818     secondary_nodes = instance.secondary_nodes
3819     if not secondary_nodes:
3820       raise errors.ConfigurationError("No secondary node but using"
3821                                       " drbd8 disk template")
3822
3823     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3824
3825     target_node = secondary_nodes[0]
3826     # check memory requirements on the secondary node
3827     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3828                          instance.name, i_be[constants.BE_MEMORY],
3829                          instance.hypervisor)
3830
3831     # check bridge existance
3832     _CheckInstanceBridgesExist(self, instance, node=target_node)
3833
3834     if not self.op.cleanup:
3835       _CheckNodeNotDrained(self, target_node)
3836       result = self.rpc.call_instance_migratable(instance.primary_node,
3837                                                  instance)
3838       result.Raise("Can't migrate, please use failover", prereq=True)
3839
3840     self.instance = instance
3841
3842   def _WaitUntilSync(self):
3843     """Poll with custom rpc for disk sync.
3844
3845     This uses our own step-based rpc call.
3846
3847     """
3848     self.feedback_fn("* wait until resync is done")
3849     all_done = False
3850     while not all_done:
3851       all_done = True
3852       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3853                                             self.nodes_ip,
3854                                             self.instance.disks)
3855       min_percent = 100
3856       for node, nres in result.items():
3857         nres.Raise("Cannot resync disks on node %s" % node)
3858         node_done, node_percent = nres.payload
3859         all_done = all_done and node_done
3860         if node_percent is not None:
3861           min_percent = min(min_percent, node_percent)
3862       if not all_done:
3863         if min_percent < 100:
3864           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3865         time.sleep(2)
3866
3867   def _EnsureSecondary(self, node):
3868     """Demote a node to secondary.
3869
3870     """
3871     self.feedback_fn("* switching node %s to secondary mode" % node)
3872
3873     for dev in self.instance.disks:
3874       self.cfg.SetDiskID(dev, node)
3875
3876     result = self.rpc.call_blockdev_close(node, self.instance.name,
3877                                           self.instance.disks)
3878     result.Raise("Cannot change disk to secondary on node %s" % node)
3879
3880   def _GoStandalone(self):
3881     """Disconnect from the network.
3882
3883     """
3884     self.feedback_fn("* changing into standalone mode")
3885     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3886                                                self.instance.disks)
3887     for node, nres in result.items():
3888       nres.Raise("Cannot disconnect disks node %s" % node)
3889
3890   def _GoReconnect(self, multimaster):
3891     """Reconnect to the network.
3892
3893     """
3894     if multimaster:
3895       msg = "dual-master"
3896     else:
3897       msg = "single-master"
3898     self.feedback_fn("* changing disks into %s mode" % msg)
3899     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3900                                            self.instance.disks,
3901                                            self.instance.name, multimaster)
3902     for node, nres in result.items():
3903       nres.Raise("Cannot change disks config on node %s" % node)
3904
3905   def _ExecCleanup(self):
3906     """Try to cleanup after a failed migration.
3907
3908     The cleanup is done by:
3909       - check that the instance is running only on one node
3910         (and update the config if needed)
3911       - change disks on its secondary node to secondary
3912       - wait until disks are fully synchronized
3913       - disconnect from the network
3914       - change disks into single-master mode
3915       - wait again until disks are fully synchronized
3916
3917     """
3918     instance = self.instance
3919     target_node = self.target_node
3920     source_node = self.source_node
3921
3922     # check running on only one node
3923     self.feedback_fn("* checking where the instance actually runs"
3924                      " (if this hangs, the hypervisor might be in"
3925                      " a bad state)")
3926     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3927     for node, result in ins_l.items():
3928       result.Raise("Can't contact node %s" % node)
3929
3930     runningon_source = instance.name in ins_l[source_node].payload
3931     runningon_target = instance.name in ins_l[target_node].payload
3932
3933     if runningon_source and runningon_target:
3934       raise errors.OpExecError("Instance seems to be running on two nodes,"
3935                                " or the hypervisor is confused. You will have"
3936                                " to ensure manually that it runs only on one"
3937                                " and restart this operation.")
3938
3939     if not (runningon_source or runningon_target):
3940       raise errors.OpExecError("Instance does not seem to be running at all."
3941                                " In this case, it's safer to repair by"
3942                                " running 'gnt-instance stop' to ensure disk"
3943                                " shutdown, and then restarting it.")
3944
3945     if runningon_target:
3946       # the migration has actually succeeded, we need to update the config
3947       self.feedback_fn("* instance running on secondary node (%s),"
3948                        " updating config" % target_node)
3949       instance.primary_node = target_node
3950       self.cfg.Update(instance)
3951       demoted_node = source_node
3952     else:
3953       self.feedback_fn("* instance confirmed to be running on its"
3954                        " primary node (%s)" % source_node)
3955       demoted_node = target_node
3956
3957     self._EnsureSecondary(demoted_node)
3958     try:
3959       self._WaitUntilSync()
3960     except errors.OpExecError:
3961       # we ignore here errors, since if the device is standalone, it
3962       # won't be able to sync
3963       pass
3964     self._GoStandalone()
3965     self._GoReconnect(False)
3966     self._WaitUntilSync()
3967
3968     self.feedback_fn("* done")
3969
3970   def _RevertDiskStatus(self):
3971     """Try to revert the disk status after a failed migration.
3972
3973     """
3974     target_node = self.target_node
3975     try:
3976       self._EnsureSecondary(target_node)
3977       self._GoStandalone()
3978       self._GoReconnect(False)
3979       self._WaitUntilSync()
3980     except errors.OpExecError, err:
3981       self.LogWarning("Migration failed and I can't reconnect the"
3982                       " drives: error '%s'\n"
3983                       "Please look and recover the instance status" %
3984                       str(err))
3985
3986   def _AbortMigration(self):
3987     """Call the hypervisor code to abort a started migration.
3988
3989     """
3990     instance = self.instance
3991     target_node = self.target_node
3992     migration_info = self.migration_info
3993
3994     abort_result = self.rpc.call_finalize_migration(target_node,
3995                                                     instance,
3996                                                     migration_info,
3997                                                     False)
3998     abort_msg = abort_result.fail_msg
3999     if abort_msg:
4000       logging.error("Aborting migration failed on target node %s: %s" %
4001                     (target_node, abort_msg))
4002       # Don't raise an exception here, as we stil have to try to revert the
4003       # disk status, even if this step failed.
4004
4005   def _ExecMigration(self):
4006     """Migrate an instance.
4007
4008     The migrate is done by:
4009       - change the disks into dual-master mode
4010       - wait until disks are fully synchronized again
4011       - migrate the instance
4012       - change disks on the new secondary node (the old primary) to secondary
4013       - wait until disks are fully synchronized
4014       - change disks into single-master mode
4015
4016     """
4017     instance = self.instance
4018     target_node = self.target_node
4019     source_node = self.source_node
4020
4021     self.feedback_fn("* checking disk consistency between source and target")
4022     for dev in instance.disks:
4023       if not _CheckDiskConsistency(self, dev, target_node, False):
4024         raise errors.OpExecError("Disk %s is degraded or not fully"
4025                                  " synchronized on target node,"
4026                                  " aborting migrate." % dev.iv_name)
4027
4028     # First get the migration information from the remote node
4029     result = self.rpc.call_migration_info(source_node, instance)
4030     msg = result.fail_msg
4031     if msg:
4032       log_err = ("Failed fetching source migration information from %s: %s" %
4033                  (source_node, msg))
4034       logging.error(log_err)
4035       raise errors.OpExecError(log_err)
4036
4037     self.migration_info = migration_info = result.payload
4038
4039     # Then switch the disks to master/master mode
4040     self._EnsureSecondary(target_node)
4041     self._GoStandalone()
4042     self._GoReconnect(True)
4043     self._WaitUntilSync()
4044
4045     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4046     result = self.rpc.call_accept_instance(target_node,
4047                                            instance,
4048                                            migration_info,
4049                                            self.nodes_ip[target_node])
4050
4051     msg = result.fail_msg
4052     if msg:
4053       logging.error("Instance pre-migration failed, trying to revert"
4054                     " disk status: %s", msg)
4055       self._AbortMigration()
4056       self._RevertDiskStatus()
4057       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4058                                (instance.name, msg))
4059
4060     self.feedback_fn("* migrating instance to %s" % target_node)
4061     time.sleep(10)
4062     result = self.rpc.call_instance_migrate(source_node, instance,
4063                                             self.nodes_ip[target_node],
4064                                             self.op.live)
4065     msg = result.fail_msg
4066     if msg:
4067       logging.error("Instance migration failed, trying to revert"
4068                     " disk status: %s", msg)
4069       self._AbortMigration()
4070       self._RevertDiskStatus()
4071       raise errors.OpExecError("Could not migrate instance %s: %s" %
4072                                (instance.name, msg))
4073     time.sleep(10)
4074
4075     instance.primary_node = target_node
4076     # distribute new instance config to the other nodes
4077     self.cfg.Update(instance)
4078
4079     result = self.rpc.call_finalize_migration(target_node,
4080                                               instance,
4081                                               migration_info,
4082                                               True)
4083     msg = result.fail_msg
4084     if msg:
4085       logging.error("Instance migration succeeded, but finalization failed:"
4086                     " %s" % msg)
4087       raise errors.OpExecError("Could not finalize instance migration: %s" %
4088                                msg)
4089
4090     self._EnsureSecondary(source_node)
4091     self._WaitUntilSync()
4092     self._GoStandalone()
4093     self._GoReconnect(False)
4094     self._WaitUntilSync()
4095
4096     self.feedback_fn("* done")
4097
4098   def Exec(self, feedback_fn):
4099     """Perform the migration.
4100
4101     """
4102     self.feedback_fn = feedback_fn
4103
4104     self.source_node = self.instance.primary_node
4105     self.target_node = self.instance.secondary_nodes[0]
4106     self.all_nodes = [self.source_node, self.target_node]
4107     self.nodes_ip = {
4108       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4109       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4110       }
4111     if self.op.cleanup:
4112       return self._ExecCleanup()
4113     else:
4114       return self._ExecMigration()
4115
4116
4117 def _CreateBlockDev(lu, node, instance, device, force_create,
4118                     info, force_open):
4119   """Create a tree of block devices on a given node.
4120
4121   If this device type has to be created on secondaries, create it and
4122   all its children.
4123
4124   If not, just recurse to children keeping the same 'force' value.
4125
4126   @param lu: the lu on whose behalf we execute
4127   @param node: the node on which to create the device
4128   @type instance: L{objects.Instance}
4129   @param instance: the instance which owns the device
4130   @type device: L{objects.Disk}
4131   @param device: the device to create
4132   @type force_create: boolean
4133   @param force_create: whether to force creation of this device; this
4134       will be change to True whenever we find a device which has
4135       CreateOnSecondary() attribute
4136   @param info: the extra 'metadata' we should attach to the device
4137       (this will be represented as a LVM tag)
4138   @type force_open: boolean
4139   @param force_open: this parameter will be passes to the
4140       L{backend.BlockdevCreate} function where it specifies
4141       whether we run on primary or not, and it affects both
4142       the child assembly and the device own Open() execution
4143
4144   """
4145   if device.CreateOnSecondary():
4146     force_create = True
4147
4148   if device.children:
4149     for child in device.children:
4150       _CreateBlockDev(lu, node, instance, child, force_create,
4151                       info, force_open)
4152
4153   if not force_create:
4154     return
4155
4156   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4157
4158
4159 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4160   """Create a single block device on a given node.
4161
4162   This will not recurse over children of the device, so they must be
4163   created in advance.
4164
4165   @param lu: the lu on whose behalf we execute
4166   @param node: the node on which to create the device
4167   @type instance: L{objects.Instance}
4168   @param instance: the instance which owns the device
4169   @type device: L{objects.Disk}
4170   @param device: the device to create
4171   @param info: the extra 'metadata' we should attach to the device
4172       (this will be represented as a LVM tag)
4173   @type force_open: boolean
4174   @param force_open: this parameter will be passes to the
4175       L{backend.BlockdevCreate} function where it specifies
4176       whether we run on primary or not, and it affects both
4177       the child assembly and the device own Open() execution
4178
4179   """
4180   lu.cfg.SetDiskID(device, node)
4181   result = lu.rpc.call_blockdev_create(node, device, device.size,
4182                                        instance.name, force_open, info)
4183   result.Raise("Can't create block device %s on"
4184                " node %s for instance %s" % (device, node, instance.name))
4185   if device.physical_id is None:
4186     device.physical_id = result.payload
4187
4188
4189 def _GenerateUniqueNames(lu, exts):
4190   """Generate a suitable LV name.
4191
4192   This will generate a logical volume name for the given instance.
4193
4194   """
4195   results = []
4196   for val in exts:
4197     new_id = lu.cfg.GenerateUniqueID()
4198     results.append("%s%s" % (new_id, val))
4199   return results
4200
4201
4202 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4203                          p_minor, s_minor):
4204   """Generate a drbd8 device complete with its children.
4205
4206   """
4207   port = lu.cfg.AllocatePort()
4208   vgname = lu.cfg.GetVGName()
4209   shared_secret = lu.cfg.GenerateDRBDSecret()
4210   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4211                           logical_id=(vgname, names[0]))
4212   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4213                           logical_id=(vgname, names[1]))
4214   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4215                           logical_id=(primary, secondary, port,
4216                                       p_minor, s_minor,
4217                                       shared_secret),
4218                           children=[dev_data, dev_meta],
4219                           iv_name=iv_name)
4220   return drbd_dev
4221
4222
4223 def _GenerateDiskTemplate(lu, template_name,
4224                           instance_name, primary_node,
4225                           secondary_nodes, disk_info,
4226                           file_storage_dir, file_driver,
4227                           base_index):
4228   """Generate the entire disk layout for a given template type.
4229
4230   """
4231   #TODO: compute space requirements
4232
4233   vgname = lu.cfg.GetVGName()
4234   disk_count = len(disk_info)
4235   disks = []
4236   if template_name == constants.DT_DISKLESS:
4237     pass
4238   elif template_name == constants.DT_PLAIN:
4239     if len(secondary_nodes) != 0:
4240       raise errors.ProgrammerError("Wrong template configuration")
4241
4242     names = _GenerateUniqueNames(lu, [".disk%d" % i
4243                                       for i in range(disk_count)])
4244     for idx, disk in enumerate(disk_info):
4245       disk_index = idx + base_index
4246       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4247                               logical_id=(vgname, names[idx]),
4248                               iv_name="disk/%d" % disk_index,
4249                               mode=disk["mode"])
4250       disks.append(disk_dev)
4251   elif template_name == constants.DT_DRBD8:
4252     if len(secondary_nodes) != 1:
4253       raise errors.ProgrammerError("Wrong template configuration")
4254     remote_node = secondary_nodes[0]
4255     minors = lu.cfg.AllocateDRBDMinor(
4256       [primary_node, remote_node] * len(disk_info), instance_name)
4257
4258     names = []
4259     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4260                                                for i in range(disk_count)]):
4261       names.append(lv_prefix + "_data")
4262       names.append(lv_prefix + "_meta")
4263     for idx, disk in enumerate(disk_info):
4264       disk_index = idx + base_index
4265       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4266                                       disk["size"], names[idx*2:idx*2+2],
4267                                       "disk/%d" % disk_index,
4268                                       minors[idx*2], minors[idx*2+1])
4269       disk_dev.mode = disk["mode"]
4270       disks.append(disk_dev)
4271   elif template_name == constants.DT_FILE:
4272     if len(secondary_nodes) != 0:
4273       raise errors.ProgrammerError("Wrong template configuration")
4274
4275     for idx, disk in enumerate(disk_info):
4276       disk_index = idx + base_index
4277       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4278                               iv_name="disk/%d" % disk_index,
4279                               logical_id=(file_driver,
4280                                           "%s/disk%d" % (file_storage_dir,
4281                                                          disk_index)),
4282                               mode=disk["mode"])
4283       disks.append(disk_dev)
4284   else:
4285     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4286   return disks
4287
4288
4289 def _GetInstanceInfoText(instance):
4290   """Compute that text that should be added to the disk's metadata.
4291
4292   """
4293   return "originstname+%s" % instance.name
4294
4295
4296 def _CreateDisks(lu, instance):
4297   """Create all disks for an instance.
4298
4299   This abstracts away some work from AddInstance.
4300
4301   @type lu: L{LogicalUnit}
4302   @param lu: the logical unit on whose behalf we execute
4303   @type instance: L{objects.Instance}
4304   @param instance: the instance whose disks we should create
4305   @rtype: boolean
4306   @return: the success of the creation
4307
4308   """
4309   info = _GetInstanceInfoText(instance)
4310   pnode = instance.primary_node
4311
4312   if instance.disk_template == constants.DT_FILE:
4313     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4314     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4315
4316     result.Raise("Failed to create directory '%s' on"
4317                  " node %s: %s" % (file_storage_dir, pnode))
4318
4319   # Note: this needs to be kept in sync with adding of disks in
4320   # LUSetInstanceParams
4321   for device in instance.disks:
4322     logging.info("Creating volume %s for instance %s",
4323                  device.iv_name, instance.name)
4324     #HARDCODE
4325     for node in instance.all_nodes:
4326       f_create = node == pnode
4327       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4328
4329
4330 def _RemoveDisks(lu, instance):
4331   """Remove all disks for an instance.
4332
4333   This abstracts away some work from `AddInstance()` and
4334   `RemoveInstance()`. Note that in case some of the devices couldn't
4335   be removed, the removal will continue with the other ones (compare
4336   with `_CreateDisks()`).
4337
4338   @type lu: L{LogicalUnit}
4339   @param lu: the logical unit on whose behalf we execute
4340   @type instance: L{objects.Instance}
4341   @param instance: the instance whose disks we should remove
4342   @rtype: boolean
4343   @return: the success of the removal
4344
4345   """
4346   logging.info("Removing block devices for instance %s", instance.name)
4347
4348   all_result = True
4349   for device in instance.disks:
4350     for node, disk in device.ComputeNodeTree(instance.primary_node):
4351       lu.cfg.SetDiskID(disk, node)
4352       msg = lu.rpc.call_blockdev_remove(node, disk).fail_msg
4353       if msg:
4354         lu.LogWarning("Could not remove block device %s on node %s,"
4355                       " continuing anyway: %s", device.iv_name, node, msg)
4356         all_result = False
4357
4358   if instance.disk_template == constants.DT_FILE:
4359     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4360     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4361                                                  file_storage_dir)
4362     msg = result.fail_msg
4363     if msg:
4364       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4365                     file_storage_dir, instance.primary_node, msg)
4366       all_result = False
4367
4368   return all_result
4369
4370
4371 def _ComputeDiskSize(disk_template, disks):
4372   """Compute disk size requirements in the volume group
4373
4374   """
4375   # Required free disk space as a function of disk and swap space
4376   req_size_dict = {
4377     constants.DT_DISKLESS: None,
4378     constants.DT_PLAIN: sum(d["size"] for d in disks),
4379     # 128 MB are added for drbd metadata for each disk
4380     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4381     constants.DT_FILE: None,
4382   }
4383
4384   if disk_template not in req_size_dict:
4385     raise errors.ProgrammerError("Disk template '%s' size requirement"
4386                                  " is unknown" %  disk_template)
4387
4388   return req_size_dict[disk_template]
4389
4390
4391 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4392   """Hypervisor parameter validation.
4393
4394   This function abstract the hypervisor parameter validation to be
4395   used in both instance create and instance modify.
4396
4397   @type lu: L{LogicalUnit}
4398   @param lu: the logical unit for which we check
4399   @type nodenames: list
4400   @param nodenames: the list of nodes on which we should check
4401   @type hvname: string
4402   @param hvname: the name of the hypervisor we should use
4403   @type hvparams: dict
4404   @param hvparams: the parameters which we need to check
4405   @raise errors.OpPrereqError: if the parameters are not valid
4406
4407   """
4408   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4409                                                   hvname,
4410                                                   hvparams)
4411   for node in nodenames:
4412     info = hvinfo[node]
4413     if info.offline:
4414       continue
4415     info.Raise("Hypervisor parameter validation failed on node %s" % node)
4416
4417
4418 class LUCreateInstance(LogicalUnit):
4419   """Create an instance.
4420
4421   """
4422   HPATH = "instance-add"
4423   HTYPE = constants.HTYPE_INSTANCE
4424   _OP_REQP = ["instance_name", "disks", "disk_template",
4425               "mode", "start",
4426               "wait_for_sync", "ip_check", "nics",
4427               "hvparams", "beparams"]
4428   REQ_BGL = False
4429
4430   def _ExpandNode(self, node):
4431     """Expands and checks one node name.
4432
4433     """
4434     node_full = self.cfg.ExpandNodeName(node)
4435     if node_full is None:
4436       raise errors.OpPrereqError("Unknown node %s" % node)
4437     return node_full
4438
4439   def ExpandNames(self):
4440     """ExpandNames for CreateInstance.
4441
4442     Figure out the right locks for instance creation.
4443
4444     """
4445     self.needed_locks = {}
4446
4447     # set optional parameters to none if they don't exist
4448     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4449       if not hasattr(self.op, attr):
4450         setattr(self.op, attr, None)
4451
4452     # cheap checks, mostly valid constants given
4453
4454     # verify creation mode
4455     if self.op.mode not in (constants.INSTANCE_CREATE,
4456                             constants.INSTANCE_IMPORT):
4457       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4458                                  self.op.mode)
4459
4460     # disk template and mirror node verification
4461     if self.op.disk_template not in constants.DISK_TEMPLATES:
4462       raise errors.OpPrereqError("Invalid disk template name")
4463
4464     if self.op.hypervisor is None:
4465       self.op.hypervisor = self.cfg.GetHypervisorType()
4466
4467     cluster = self.cfg.GetClusterInfo()
4468     enabled_hvs = cluster.enabled_hypervisors
4469     if self.op.hypervisor not in enabled_hvs:
4470       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4471                                  " cluster (%s)" % (self.op.hypervisor,
4472                                   ",".join(enabled_hvs)))
4473
4474     # check hypervisor parameter syntax (locally)
4475     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4476     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4477                                   self.op.hvparams)
4478     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4479     hv_type.CheckParameterSyntax(filled_hvp)
4480     self.hv_full = filled_hvp
4481
4482     # fill and remember the beparams dict
4483     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4484     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4485                                     self.op.beparams)
4486
4487     #### instance parameters check
4488
4489     # instance name verification
4490     hostname1 = utils.HostInfo(self.op.instance_name)
4491     self.op.instance_name = instance_name = hostname1.name
4492
4493     # this is just a preventive check, but someone might still add this
4494     # instance in the meantime, and creation will fail at lock-add time
4495     if instance_name in self.cfg.GetInstanceList():
4496       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4497                                  instance_name)
4498
4499     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4500
4501     # NIC buildup
4502     self.nics = []
4503     for idx, nic in enumerate(self.op.nics):
4504       nic_mode_req = nic.get("mode", None)
4505       nic_mode = nic_mode_req
4506       if nic_mode is None:
4507         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4508
4509       # in routed mode, for the first nic, the default ip is 'auto'
4510       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4511         default_ip_mode = constants.VALUE_AUTO
4512       else:
4513         default_ip_mode = constants.VALUE_NONE
4514
4515       # ip validity checks
4516       ip = nic.get("ip", default_ip_mode)
4517       if ip is None or ip.lower() == constants.VALUE_NONE:
4518         nic_ip = None
4519       elif ip.lower() == constants.VALUE_AUTO:
4520         nic_ip = hostname1.ip
4521       else:
4522         if not utils.IsValidIP(ip):
4523           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4524                                      " like a valid IP" % ip)
4525         nic_ip = ip
4526
4527       # TODO: check the ip for uniqueness !!
4528       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4529         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4530
4531       # MAC address verification
4532       mac = nic.get("mac", constants.VALUE_AUTO)
4533       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4534         if not utils.IsValidMac(mac.lower()):
4535           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4536                                      mac)
4537       # bridge verification
4538       bridge = nic.get("bridge", None)
4539       link = nic.get("link", None)
4540       if bridge and link:
4541         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
4542                                    " at the same time")
4543       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4544         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4545       elif bridge:
4546         link = bridge
4547
4548       nicparams = {}
4549       if nic_mode_req:
4550         nicparams[constants.NIC_MODE] = nic_mode_req
4551       if link:
4552         nicparams[constants.NIC_LINK] = link
4553
4554       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4555                                       nicparams)
4556       objects.NIC.CheckParameterSyntax(check_params)
4557       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4558
4559     # disk checks/pre-build
4560     self.disks = []
4561     for disk in self.op.disks:
4562       mode = disk.get("mode", constants.DISK_RDWR)
4563       if mode not in constants.DISK_ACCESS_SET:
4564         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4565                                    mode)
4566       size = disk.get("size", None)
4567       if size is None:
4568         raise errors.OpPrereqError("Missing disk size")
4569       try:
4570         size = int(size)
4571       except ValueError:
4572         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4573       self.disks.append({"size": size, "mode": mode})
4574
4575     # used in CheckPrereq for ip ping check
4576     self.check_ip = hostname1.ip
4577
4578     # file storage checks
4579     if (self.op.file_driver and
4580         not self.op.file_driver in constants.FILE_DRIVER):
4581       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4582                                  self.op.file_driver)
4583
4584     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4585       raise errors.OpPrereqError("File storage directory path not absolute")
4586
4587     ### Node/iallocator related checks
4588     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4589       raise errors.OpPrereqError("One and only one of iallocator and primary"
4590                                  " node must be given")
4591
4592     if self.op.iallocator:
4593       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4594     else:
4595       self.op.pnode = self._ExpandNode(self.op.pnode)
4596       nodelist = [self.op.pnode]
4597       if self.op.snode is not None:
4598         self.op.snode = self._ExpandNode(self.op.snode)
4599         nodelist.append(self.op.snode)
4600       self.needed_locks[locking.LEVEL_NODE] = nodelist
4601
4602     # in case of import lock the source node too
4603     if self.op.mode == constants.INSTANCE_IMPORT:
4604       src_node = getattr(self.op, "src_node", None)
4605       src_path = getattr(self.op, "src_path", None)
4606
4607       if src_path is None:
4608         self.op.src_path = src_path = self.op.instance_name
4609
4610       if src_node is None:
4611         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4612         self.op.src_node = None
4613         if os.path.isabs(src_path):
4614           raise errors.OpPrereqError("Importing an instance from an absolute"
4615                                      " path requires a source node option.")
4616       else:
4617         self.op.src_node = src_node = self._ExpandNode(src_node)
4618         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4619           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4620         if not os.path.isabs(src_path):
4621           self.op.src_path = src_path = \
4622             os.path.join(constants.EXPORT_DIR, src_path)
4623
4624     else: # INSTANCE_CREATE
4625       if getattr(self.op, "os_type", None) is None:
4626         raise errors.OpPrereqError("No guest OS specified")
4627
4628   def _RunAllocator(self):
4629     """Run the allocator based on input opcode.
4630
4631     """
4632     nics = [n.ToDict() for n in self.nics]
4633     ial = IAllocator(self,
4634                      mode=constants.IALLOCATOR_MODE_ALLOC,
4635                      name=self.op.instance_name,
4636                      disk_template=self.op.disk_template,
4637                      tags=[],
4638                      os=self.op.os_type,
4639                      vcpus=self.be_full[constants.BE_VCPUS],
4640                      mem_size=self.be_full[constants.BE_MEMORY],
4641                      disks=self.disks,
4642                      nics=nics,
4643                      hypervisor=self.op.hypervisor,
4644                      )
4645
4646     ial.Run(self.op.iallocator)
4647
4648     if not ial.success:
4649       raise errors.OpPrereqError("Can't compute nodes using"
4650                                  " iallocator '%s': %s" % (self.op.iallocator,
4651                                                            ial.info))
4652     if len(ial.nodes) != ial.required_nodes:
4653       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4654                                  " of nodes (%s), required %s" %
4655                                  (self.op.iallocator, len(ial.nodes),
4656                                   ial.required_nodes))
4657     self.op.pnode = ial.nodes[0]
4658     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4659                  self.op.instance_name, self.op.iallocator,
4660                  ", ".join(ial.nodes))
4661     if ial.required_nodes == 2:
4662       self.op.snode = ial.nodes[1]
4663
4664   def BuildHooksEnv(self):
4665     """Build hooks env.
4666
4667     This runs on master, primary and secondary nodes of the instance.
4668
4669     """
4670     env = {
4671       "ADD_MODE": self.op.mode,
4672       }
4673     if self.op.mode == constants.INSTANCE_IMPORT:
4674       env["SRC_NODE"] = self.op.src_node
4675       env["SRC_PATH"] = self.op.src_path
4676       env["SRC_IMAGES"] = self.src_images
4677
4678     env.update(_BuildInstanceHookEnv(
4679       name=self.op.instance_name,
4680       primary_node=self.op.pnode,
4681       secondary_nodes=self.secondaries,
4682       status=self.op.start,
4683       os_type=self.op.os_type,
4684       memory=self.be_full[constants.BE_MEMORY],
4685       vcpus=self.be_full[constants.BE_VCPUS],
4686       nics=_NICListToTuple(self, self.nics),
4687       disk_template=self.op.disk_template,
4688       disks=[(d["size"], d["mode"]) for d in self.disks],
4689       bep=self.be_full,
4690       hvp=self.hv_full,
4691       hypervisor=self.op.hypervisor,
4692     ))
4693
4694     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4695           self.secondaries)
4696     return env, nl, nl
4697
4698
4699   def CheckPrereq(self):
4700     """Check prerequisites.
4701
4702     """
4703     if (not self.cfg.GetVGName() and
4704         self.op.disk_template not in constants.DTS_NOT_LVM):
4705       raise errors.OpPrereqError("Cluster does not support lvm-based"
4706                                  " instances")
4707
4708     if self.op.mode == constants.INSTANCE_IMPORT:
4709       src_node = self.op.src_node
4710       src_path = self.op.src_path
4711
4712       if src_node is None:
4713         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4714         exp_list = self.rpc.call_export_list(locked_nodes)
4715         found = False
4716         for node in exp_list:
4717           if exp_list[node].fail_msg:
4718             continue
4719           if src_path in exp_list[node].payload:
4720             found = True
4721             self.op.src_node = src_node = node
4722             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4723                                                        src_path)
4724             break
4725         if not found:
4726           raise errors.OpPrereqError("No export found for relative path %s" %
4727                                       src_path)
4728
4729       _CheckNodeOnline(self, src_node)
4730       result = self.rpc.call_export_info(src_node, src_path)
4731       result.Raise("No export or invalid export found in dir %s" % src_path)
4732
4733       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4734       if not export_info.has_section(constants.INISECT_EXP):
4735         raise errors.ProgrammerError("Corrupted export config")
4736
4737       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4738       if (int(ei_version) != constants.EXPORT_VERSION):
4739         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4740                                    (ei_version, constants.EXPORT_VERSION))
4741
4742       # Check that the new instance doesn't have less disks than the export
4743       instance_disks = len(self.disks)
4744       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4745       if instance_disks < export_disks:
4746         raise errors.OpPrereqError("Not enough disks to import."
4747                                    " (instance: %d, export: %d)" %
4748                                    (instance_disks, export_disks))
4749
4750       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4751       disk_images = []
4752       for idx in range(export_disks):
4753         option = 'disk%d_dump' % idx
4754         if export_info.has_option(constants.INISECT_INS, option):
4755           # FIXME: are the old os-es, disk sizes, etc. useful?
4756           export_name = export_info.get(constants.INISECT_INS, option)
4757           image = os.path.join(src_path, export_name)
4758           disk_images.append(image)
4759         else:
4760           disk_images.append(False)
4761
4762       self.src_images = disk_images
4763
4764       old_name = export_info.get(constants.INISECT_INS, 'name')
4765       # FIXME: int() here could throw a ValueError on broken exports
4766       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4767       if self.op.instance_name == old_name:
4768         for idx, nic in enumerate(self.nics):
4769           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4770             nic_mac_ini = 'nic%d_mac' % idx
4771             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4772
4773     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4774     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4775     if self.op.start and not self.op.ip_check:
4776       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4777                                  " adding an instance in start mode")
4778
4779     if self.op.ip_check:
4780       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4781         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4782                                    (self.check_ip, self.op.instance_name))
4783
4784     #### mac address generation
4785     # By generating here the mac address both the allocator and the hooks get
4786     # the real final mac address rather than the 'auto' or 'generate' value.
4787     # There is a race condition between the generation and the instance object
4788     # creation, which means that we know the mac is valid now, but we're not
4789     # sure it will be when we actually add the instance. If things go bad
4790     # adding the instance will abort because of a duplicate mac, and the
4791     # creation job will fail.
4792     for nic in self.nics:
4793       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4794         nic.mac = self.cfg.GenerateMAC()
4795
4796     #### allocator run
4797
4798     if self.op.iallocator is not None:
4799       self._RunAllocator()
4800
4801     #### node related checks
4802
4803     # check primary node
4804     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4805     assert self.pnode is not None, \
4806       "Cannot retrieve locked node %s" % self.op.pnode
4807     if pnode.offline:
4808       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4809                                  pnode.name)
4810     if pnode.drained:
4811       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4812                                  pnode.name)
4813
4814     self.secondaries = []
4815
4816     # mirror node verification
4817     if self.op.disk_template in constants.DTS_NET_MIRROR:
4818       if self.op.snode is None:
4819         raise errors.OpPrereqError("The networked disk templates need"
4820                                    " a mirror node")
4821       if self.op.snode == pnode.name:
4822         raise errors.OpPrereqError("The secondary node cannot be"
4823                                    " the primary node.")
4824       _CheckNodeOnline(self, self.op.snode)
4825       _CheckNodeNotDrained(self, self.op.snode)
4826       self.secondaries.append(self.op.snode)
4827
4828     nodenames = [pnode.name] + self.secondaries
4829
4830     req_size = _ComputeDiskSize(self.op.disk_template,
4831                                 self.disks)
4832
4833     # Check lv size requirements
4834     if req_size is not None:
4835       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4836                                          self.op.hypervisor)
4837       for node in nodenames:
4838         info = nodeinfo[node]
4839         info.Raise("Cannot get current information from node %s" % node)
4840         info = info.payload
4841         vg_free = info.get('vg_free', None)
4842         if not isinstance(vg_free, int):
4843           raise errors.OpPrereqError("Can't compute free disk space on"
4844                                      " node %s" % node)
4845         if req_size > vg_free:
4846           raise errors.OpPrereqError("Not enough disk space on target node %s."
4847                                      " %d MB available, %d MB required" %
4848                                      (node, vg_free, req_size))
4849
4850     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4851
4852     # os verification
4853     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4854     result.Raise("OS '%s' not in supported os list for primary node %s" %
4855                  (self.op.os_type, pnode.name), prereq=True)
4856
4857     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4858
4859     # memory check on primary node
4860     if self.op.start:
4861       _CheckNodeFreeMemory(self, self.pnode.name,
4862                            "creating instance %s" % self.op.instance_name,
4863                            self.be_full[constants.BE_MEMORY],
4864                            self.op.hypervisor)
4865
4866     self.dry_run_result = list(nodenames)
4867
4868   def Exec(self, feedback_fn):
4869     """Create and add the instance to the cluster.
4870
4871     """
4872     instance = self.op.instance_name
4873     pnode_name = self.pnode.name
4874
4875     ht_kind = self.op.hypervisor
4876     if ht_kind in constants.HTS_REQ_PORT:
4877       network_port = self.cfg.AllocatePort()
4878     else:
4879       network_port = None
4880
4881     ##if self.op.vnc_bind_address is None:
4882     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4883
4884     # this is needed because os.path.join does not accept None arguments
4885     if self.op.file_storage_dir is None:
4886       string_file_storage_dir = ""
4887     else:
4888       string_file_storage_dir = self.op.file_storage_dir
4889
4890     # build the full file storage dir path
4891     file_storage_dir = os.path.normpath(os.path.join(
4892                                         self.cfg.GetFileStorageDir(),
4893                                         string_file_storage_dir, instance))
4894
4895
4896     disks = _GenerateDiskTemplate(self,
4897                                   self.op.disk_template,
4898                                   instance, pnode_name,
4899                                   self.secondaries,
4900                                   self.disks,
4901                                   file_storage_dir,
4902                                   self.op.file_driver,
4903                                   0)
4904
4905     iobj = objects.Instance(name=instance, os=self.op.os_type,
4906                             primary_node=pnode_name,
4907                             nics=self.nics, disks=disks,
4908                             disk_template=self.op.disk_template,
4909                             admin_up=False,
4910                             network_port=network_port,
4911                             beparams=self.op.beparams,
4912                             hvparams=self.op.hvparams,
4913                             hypervisor=self.op.hypervisor,
4914                             )
4915
4916     feedback_fn("* creating instance disks...")
4917     try:
4918       _CreateDisks(self, iobj)
4919     except errors.OpExecError:
4920       self.LogWarning("Device creation failed, reverting...")
4921       try:
4922         _RemoveDisks(self, iobj)
4923       finally:
4924         self.cfg.ReleaseDRBDMinors(instance)
4925         raise
4926
4927     feedback_fn("adding instance %s to cluster config" % instance)
4928
4929     self.cfg.AddInstance(iobj)
4930     # Declare that we don't want to remove the instance lock anymore, as we've
4931     # added the instance to the config
4932     del self.remove_locks[locking.LEVEL_INSTANCE]
4933     # Unlock all the nodes
4934     if self.op.mode == constants.INSTANCE_IMPORT:
4935       nodes_keep = [self.op.src_node]
4936       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4937                        if node != self.op.src_node]
4938       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4939       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4940     else:
4941       self.context.glm.release(locking.LEVEL_NODE)
4942       del self.acquired_locks[locking.LEVEL_NODE]
4943
4944     if self.op.wait_for_sync:
4945       disk_abort = not _WaitForSync(self, iobj)
4946     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4947       # make sure the disks are not degraded (still sync-ing is ok)
4948       time.sleep(15)
4949       feedback_fn("* checking mirrors status")
4950       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4951     else:
4952       disk_abort = False
4953
4954     if disk_abort:
4955       _RemoveDisks(self, iobj)
4956       self.cfg.RemoveInstance(iobj.name)
4957       # Make sure the instance lock gets removed
4958       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4959       raise errors.OpExecError("There are some degraded disks for"
4960                                " this instance")
4961
4962     feedback_fn("creating os for instance %s on node %s" %
4963                 (instance, pnode_name))
4964
4965     if iobj.disk_template != constants.DT_DISKLESS:
4966       if self.op.mode == constants.INSTANCE_CREATE:
4967         feedback_fn("* running the instance OS create scripts...")
4968         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4969         result.Raise("Could not add os for instance %s"
4970                      " on node %s" % (instance, pnode_name))
4971
4972       elif self.op.mode == constants.INSTANCE_IMPORT:
4973         feedback_fn("* running the instance OS import scripts...")
4974         src_node = self.op.src_node
4975         src_images = self.src_images
4976         cluster_name = self.cfg.GetClusterName()
4977         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4978                                                          src_node, src_images,
4979                                                          cluster_name)
4980         msg = import_result.fail_msg
4981         if msg:
4982           self.LogWarning("Error while importing the disk images for instance"
4983                           " %s on node %s: %s" % (instance, pnode_name, msg))
4984       else:
4985         # also checked in the prereq part
4986         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4987                                      % self.op.mode)
4988
4989     if self.op.start:
4990       iobj.admin_up = True
4991       self.cfg.Update(iobj)
4992       logging.info("Starting instance %s on node %s", instance, pnode_name)
4993       feedback_fn("* starting instance...")
4994       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4995       result.Raise("Could not start instance")
4996
4997     return list(iobj.all_nodes)
4998
4999
5000 class LUConnectConsole(NoHooksLU):
5001   """Connect to an instance's console.
5002
5003   This is somewhat special in that it returns the command line that
5004   you need to run on the master node in order to connect to the
5005   console.
5006
5007   """
5008   _OP_REQP = ["instance_name"]
5009   REQ_BGL = False
5010
5011   def ExpandNames(self):
5012     self._ExpandAndLockInstance()
5013
5014   def CheckPrereq(self):
5015     """Check prerequisites.
5016
5017     This checks that the instance is in the cluster.
5018
5019     """
5020     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5021     assert self.instance is not None, \
5022       "Cannot retrieve locked instance %s" % self.op.instance_name
5023     _CheckNodeOnline(self, self.instance.primary_node)
5024
5025   def Exec(self, feedback_fn):
5026     """Connect to the console of an instance
5027
5028     """
5029     instance = self.instance
5030     node = instance.primary_node
5031
5032     node_insts = self.rpc.call_instance_list([node],
5033                                              [instance.hypervisor])[node]
5034     node_insts.Raise("Can't get node information from %s" % node)
5035
5036     if instance.name not in node_insts.payload:
5037       raise errors.OpExecError("Instance %s is not running." % instance.name)
5038
5039     logging.debug("Connecting to console of %s on %s", instance.name, node)
5040
5041     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5042     cluster = self.cfg.GetClusterInfo()
5043     # beparams and hvparams are passed separately, to avoid editing the
5044     # instance and then saving the defaults in the instance itself.
5045     hvparams = cluster.FillHV(instance)
5046     beparams = cluster.FillBE(instance)
5047     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5048
5049     # build ssh cmdline
5050     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5051
5052
5053 class LUReplaceDisks(LogicalUnit):
5054   """Replace the disks of an instance.
5055
5056   """
5057   HPATH = "mirrors-replace"
5058   HTYPE = constants.HTYPE_INSTANCE
5059   _OP_REQP = ["instance_name", "mode", "disks"]
5060   REQ_BGL = False
5061
5062   def CheckArguments(self):
5063     if not hasattr(self.op, "remote_node"):
5064       self.op.remote_node = None
5065     if not hasattr(self.op, "iallocator"):
5066       self.op.iallocator = None
5067
5068     # check for valid parameter combination
5069     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5070     if self.op.mode == constants.REPLACE_DISK_CHG:
5071       if cnt == 2:
5072         raise errors.OpPrereqError("When changing the secondary either an"
5073                                    " iallocator script must be used or the"
5074                                    " new node given")
5075       elif cnt == 0:
5076         raise errors.OpPrereqError("Give either the iallocator or the new"
5077                                    " secondary, not both")
5078     else: # not replacing the secondary
5079       if cnt != 2:
5080         raise errors.OpPrereqError("The iallocator and new node options can"
5081                                    " be used only when changing the"
5082                                    " secondary node")
5083
5084   def ExpandNames(self):
5085     self._ExpandAndLockInstance()
5086
5087     if self.op.iallocator is not None:
5088       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5089     elif self.op.remote_node is not None:
5090       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5091       if remote_node is None:
5092         raise errors.OpPrereqError("Node '%s' not known" %
5093                                    self.op.remote_node)
5094       self.op.remote_node = remote_node
5095       # Warning: do not remove the locking of the new secondary here
5096       # unless DRBD8.AddChildren is changed to work in parallel;
5097       # currently it doesn't since parallel invocations of
5098       # FindUnusedMinor will conflict
5099       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5100       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5101     else:
5102       self.needed_locks[locking.LEVEL_NODE] = []
5103       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5104
5105   def DeclareLocks(self, level):
5106     # If we're not already locking all nodes in the set we have to declare the
5107     # instance's primary/secondary nodes.
5108     if (level == locking.LEVEL_NODE and
5109         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5110       self._LockInstancesNodes()
5111
5112   def _RunAllocator(self):
5113     """Compute a new secondary node using an IAllocator.
5114
5115     """
5116     ial = IAllocator(self,
5117                      mode=constants.IALLOCATOR_MODE_RELOC,
5118                      name=self.op.instance_name,
5119                      relocate_from=[self.sec_node])
5120
5121     ial.Run(self.op.iallocator)
5122
5123     if not ial.success:
5124       raise errors.OpPrereqError("Can't compute nodes using"
5125                                  " iallocator '%s': %s" % (self.op.iallocator,
5126                                                            ial.info))
5127     if len(ial.nodes) != ial.required_nodes:
5128       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5129                                  " of nodes (%s), required %s" %
5130                                  (len(ial.nodes), ial.required_nodes))
5131     self.op.remote_node = ial.nodes[0]
5132     self.LogInfo("Selected new secondary for the instance: %s",
5133                  self.op.remote_node)
5134
5135   def BuildHooksEnv(self):
5136     """Build hooks env.
5137
5138     This runs on the master, the primary and all the secondaries.
5139
5140     """
5141     env = {
5142       "MODE": self.op.mode,
5143       "NEW_SECONDARY": self.op.remote_node,
5144       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5145       }
5146     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5147     nl = [
5148       self.cfg.GetMasterNode(),
5149       self.instance.primary_node,
5150       ]
5151     if self.op.remote_node is not None:
5152       nl.append(self.op.remote_node)
5153     return env, nl, nl
5154
5155   def CheckPrereq(self):
5156     """Check prerequisites.
5157
5158     This checks that the instance is in the cluster.
5159
5160     """
5161     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5162     assert instance is not None, \
5163       "Cannot retrieve locked instance %s" % self.op.instance_name
5164     self.instance = instance
5165
5166     if instance.disk_template != constants.DT_DRBD8:
5167       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5168                                  " instances")
5169
5170     if len(instance.secondary_nodes) != 1:
5171       raise errors.OpPrereqError("The instance has a strange layout,"
5172                                  " expected one secondary but found %d" %
5173                                  len(instance.secondary_nodes))
5174
5175     self.sec_node = instance.secondary_nodes[0]
5176
5177     if self.op.iallocator is not None:
5178       self._RunAllocator()
5179
5180     remote_node = self.op.remote_node
5181     if remote_node is not None:
5182       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5183       assert self.remote_node_info is not None, \
5184         "Cannot retrieve locked node %s" % remote_node
5185     else:
5186       self.remote_node_info = None
5187     if remote_node == instance.primary_node:
5188       raise errors.OpPrereqError("The specified node is the primary node of"
5189                                  " the instance.")
5190     elif remote_node == self.sec_node:
5191       raise errors.OpPrereqError("The specified node is already the"
5192                                  " secondary node of the instance.")
5193
5194     if self.op.mode == constants.REPLACE_DISK_PRI:
5195       n1 = self.tgt_node = instance.primary_node
5196       n2 = self.oth_node = self.sec_node
5197     elif self.op.mode == constants.REPLACE_DISK_SEC:
5198       n1 = self.tgt_node = self.sec_node
5199       n2 = self.oth_node = instance.primary_node
5200     elif self.op.mode == constants.REPLACE_DISK_CHG:
5201       n1 = self.new_node = remote_node
5202       n2 = self.oth_node = instance.primary_node
5203       self.tgt_node = self.sec_node
5204       _CheckNodeNotDrained(self, remote_node)
5205     else:
5206       raise errors.ProgrammerError("Unhandled disk replace mode")
5207
5208     _CheckNodeOnline(self, n1)
5209     _CheckNodeOnline(self, n2)
5210
5211     if not self.op.disks:
5212       self.op.disks = range(len(instance.disks))
5213
5214     for disk_idx in self.op.disks:
5215       instance.FindDisk(disk_idx)
5216
5217   def _ExecD8DiskOnly(self, feedback_fn):
5218     """Replace a disk on the primary or secondary for dbrd8.
5219
5220     The algorithm for replace is quite complicated:
5221
5222       1. for each disk to be replaced:
5223
5224         1. create new LVs on the target node with unique names
5225         1. detach old LVs from the drbd device
5226         1. rename old LVs to name_replaced.<time_t>
5227         1. rename new LVs to old LVs
5228         1. attach the new LVs (with the old names now) to the drbd device
5229
5230       1. wait for sync across all devices
5231
5232       1. for each modified disk:
5233
5234         1. remove old LVs (which have the name name_replaces.<time_t>)
5235
5236     Failures are not very well handled.
5237
5238     """
5239     steps_total = 6
5240     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5241     instance = self.instance
5242     iv_names = {}
5243     vgname = self.cfg.GetVGName()
5244     # start of work
5245     cfg = self.cfg
5246     tgt_node = self.tgt_node
5247     oth_node = self.oth_node
5248
5249     # Step: check device activation
5250     self.proc.LogStep(1, steps_total, "check device existence")
5251     info("checking volume groups")
5252     my_vg = cfg.GetVGName()
5253     results = self.rpc.call_vg_list([oth_node, tgt_node])
5254     if not results:
5255       raise errors.OpExecError("Can't list volume groups on the nodes")
5256     for node in oth_node, tgt_node:
5257       res = results[node]
5258       res.Raise("Error checking node %s" % node)
5259       if my_vg not in res.payload:
5260         raise errors.OpExecError("Volume group '%s' not found on %s" %
5261                                  (my_vg, node))
5262     for idx, dev in enumerate(instance.disks):
5263       if idx not in self.op.disks:
5264         continue
5265       for node in tgt_node, oth_node:
5266         info("checking disk/%d on %s" % (idx, node))
5267         cfg.SetDiskID(dev, node)
5268         result = self.rpc.call_blockdev_find(node, dev)
5269         msg = result.fail_msg
5270         if not msg and not result.payload:
5271           msg = "disk not found"
5272         if msg:
5273           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5274                                    (idx, node, msg))
5275
5276     # Step: check other node consistency
5277     self.proc.LogStep(2, steps_total, "check peer consistency")
5278     for idx, dev in enumerate(instance.disks):
5279       if idx not in self.op.disks:
5280         continue
5281       info("checking disk/%d consistency on %s" % (idx, oth_node))
5282       if not _CheckDiskConsistency(self, dev, oth_node,
5283                                    oth_node==instance.primary_node):
5284         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5285                                  " to replace disks on this node (%s)" %
5286                                  (oth_node, tgt_node))
5287
5288     # Step: create new storage
5289     self.proc.LogStep(3, steps_total, "allocate new storage")
5290     for idx, dev in enumerate(instance.disks):
5291       if idx not in self.op.disks:
5292         continue
5293       size = dev.size
5294       cfg.SetDiskID(dev, tgt_node)
5295       lv_names = [".disk%d_%s" % (idx, suf)
5296                   for suf in ["data", "meta"]]
5297       names = _GenerateUniqueNames(self, lv_names)
5298       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5299                              logical_id=(vgname, names[0]))
5300       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5301                              logical_id=(vgname, names[1]))
5302       new_lvs = [lv_data, lv_meta]
5303       old_lvs = dev.children
5304       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5305       info("creating new local storage on %s for %s" %
5306            (tgt_node, dev.iv_name))
5307       # we pass force_create=True to force the LVM creation
5308       for new_lv in new_lvs:
5309         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5310                         _GetInstanceInfoText(instance), False)
5311
5312     # Step: for each lv, detach+rename*2+attach
5313     self.proc.LogStep(4, steps_total, "change drbd configuration")
5314     for dev, old_lvs, new_lvs in iv_names.itervalues():
5315       info("detaching %s drbd from local storage" % dev.iv_name)
5316       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5317       result.Raise("Can't detach drbd from local storage on node"
5318                    " %s for device %s" % (tgt_node, dev.iv_name))
5319       #dev.children = []
5320       #cfg.Update(instance)
5321
5322       # ok, we created the new LVs, so now we know we have the needed
5323       # storage; as such, we proceed on the target node to rename
5324       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5325       # using the assumption that logical_id == physical_id (which in
5326       # turn is the unique_id on that node)
5327
5328       # FIXME(iustin): use a better name for the replaced LVs
5329       temp_suffix = int(time.time())
5330       ren_fn = lambda d, suff: (d.physical_id[0],
5331                                 d.physical_id[1] + "_replaced-%s" % suff)
5332       # build the rename list based on what LVs exist on the node
5333       rlist = []
5334       for to_ren in old_lvs:
5335         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5336         if not result.fail_msg and result.payload:
5337           # device exists
5338           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5339
5340       info("renaming the old LVs on the target node")
5341       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5342       result.Raise("Can't rename old LVs on node %s" % tgt_node)
5343       # now we rename the new LVs to the old LVs
5344       info("renaming the new LVs on the target node")
5345       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5346       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5347       result.Raise("Can't rename new LVs on node %s" % tgt_node)
5348
5349       for old, new in zip(old_lvs, new_lvs):
5350         new.logical_id = old.logical_id
5351         cfg.SetDiskID(new, tgt_node)
5352
5353       for disk in old_lvs:
5354         disk.logical_id = ren_fn(disk, temp_suffix)
5355         cfg.SetDiskID(disk, tgt_node)
5356
5357       # now that the new lvs have the old name, we can add them to the device
5358       info("adding new mirror component on %s" % tgt_node)
5359       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5360       msg = result.fail_msg
5361       if msg:
5362         for new_lv in new_lvs:
5363           msg2 = self.rpc.call_blockdev_remove(tgt_node, new_lv).fail_msg
5364           if msg2:
5365             warning("Can't rollback device %s: %s", dev, msg2,
5366                     hint="cleanup manually the unused logical volumes")
5367         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5368
5369       dev.children = new_lvs
5370       cfg.Update(instance)
5371
5372     # Step: wait for sync
5373
5374     # this can fail as the old devices are degraded and _WaitForSync
5375     # does a combined result over all disks, so we don't check its
5376     # return value
5377     self.proc.LogStep(5, steps_total, "sync devices")
5378     _WaitForSync(self, instance, unlock=True)
5379
5380     # so check manually all the devices
5381     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5382       cfg.SetDiskID(dev, instance.primary_node)
5383       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5384       msg = result.fail_msg
5385       if not msg and not result.payload:
5386         msg = "disk not found"
5387       if msg:
5388         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5389                                  (name, msg))
5390       if result.payload[5]:
5391         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5392
5393     # Step: remove old storage
5394     self.proc.LogStep(6, steps_total, "removing old storage")
5395     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5396       info("remove logical volumes for %s" % name)
5397       for lv in old_lvs:
5398         cfg.SetDiskID(lv, tgt_node)
5399         msg = self.rpc.call_blockdev_remove(tgt_node, lv).fail_msg
5400         if msg:
5401           warning("Can't remove old LV: %s" % msg,
5402                   hint="manually remove unused LVs")
5403           continue
5404
5405   def _ExecD8Secondary(self, feedback_fn):
5406     """Replace the secondary node for drbd8.
5407
5408     The algorithm for replace is quite complicated:
5409       - for all disks of the instance:
5410         - create new LVs on the new node with same names
5411         - shutdown the drbd device on the old secondary
5412         - disconnect the drbd network on the primary
5413         - create the drbd device on the new secondary
5414         - network attach the drbd on the primary, using an artifice:
5415           the drbd code for Attach() will connect to the network if it
5416           finds a device which is connected to the good local disks but
5417           not network enabled
5418       - wait for sync across all devices
5419       - remove all disks from the old secondary
5420
5421     Failures are not very well handled.
5422
5423     """
5424     steps_total = 6
5425     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5426     instance = self.instance
5427     iv_names = {}
5428     # start of work
5429     cfg = self.cfg
5430     old_node = self.tgt_node
5431     new_node = self.new_node
5432     pri_node = instance.primary_node
5433     nodes_ip = {
5434       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5435       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5436       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5437       }
5438
5439     # Step: check device activation
5440     self.proc.LogStep(1, steps_total, "check device existence")
5441     info("checking volume groups")
5442     my_vg = cfg.GetVGName()
5443     results = self.rpc.call_vg_list([pri_node, new_node])
5444     for node in pri_node, new_node:
5445       res = results[node]
5446       res.Raise("Error checking node %s" % node)
5447       if my_vg not in res.payload:
5448         raise errors.OpExecError("Volume group '%s' not found on %s" %
5449                                  (my_vg, node))
5450     for idx, dev in enumerate(instance.disks):
5451       if idx not in self.op.disks:
5452         continue
5453       info("checking disk/%d on %s" % (idx, pri_node))
5454       cfg.SetDiskID(dev, pri_node)
5455       result = self.rpc.call_blockdev_find(pri_node, dev)
5456       msg = result.fail_msg
5457       if not msg and not result.payload:
5458         msg = "disk not found"
5459       if msg:
5460         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5461                                  (idx, pri_node, msg))
5462
5463     # Step: check other node consistency
5464     self.proc.LogStep(2, steps_total, "check peer consistency")
5465     for idx, dev in enumerate(instance.disks):
5466       if idx not in self.op.disks:
5467         continue
5468       info("checking disk/%d consistency on %s" % (idx, pri_node))
5469       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5470         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5471                                  " unsafe to replace the secondary" %
5472                                  pri_node)
5473
5474     # Step: create new storage
5475     self.proc.LogStep(3, steps_total, "allocate new storage")
5476     for idx, dev in enumerate(instance.disks):
5477       info("adding new local storage on %s for disk/%d" %
5478            (new_node, idx))
5479       # we pass force_create=True to force LVM creation
5480       for new_lv in dev.children:
5481         _CreateBlockDev(self, new_node, instance, new_lv, True,
5482                         _GetInstanceInfoText(instance), False)
5483
5484     # Step 4: dbrd minors and drbd setups changes
5485     # after this, we must manually remove the drbd minors on both the
5486     # error and the success paths
5487     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5488                                    instance.name)
5489     logging.debug("Allocated minors %s" % (minors,))
5490     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5491     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5492       size = dev.size
5493       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5494       # create new devices on new_node; note that we create two IDs:
5495       # one without port, so the drbd will be activated without
5496       # networking information on the new node at this stage, and one
5497       # with network, for the latter activation in step 4
5498       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5499       if pri_node == o_node1:
5500         p_minor = o_minor1
5501       else:
5502         p_minor = o_minor2
5503
5504       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5505       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5506
5507       iv_names[idx] = (dev, dev.children, new_net_id)
5508       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5509                     new_net_id)
5510       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5511                               logical_id=new_alone_id,
5512                               children=dev.children,
5513                               size=dev.size)
5514       try:
5515         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5516                               _GetInstanceInfoText(instance), False)
5517       except errors.GenericError:
5518         self.cfg.ReleaseDRBDMinors(instance.name)
5519         raise
5520
5521     for idx, dev in enumerate(instance.disks):
5522       # we have new devices, shutdown the drbd on the old secondary
5523       info("shutting down drbd for disk/%d on old node" % idx)
5524       cfg.SetDiskID(dev, old_node)
5525       msg = self.rpc.call_blockdev_shutdown(old_node, dev).fail_msg
5526       if msg:
5527         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5528                 (idx, msg),
5529                 hint="Please cleanup this device manually as soon as possible")
5530
5531     info("detaching primary drbds from the network (=> standalone)")
5532     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5533                                                instance.disks)[pri_node]
5534
5535     msg = result.fail_msg
5536     if msg:
5537       # detaches didn't succeed (unlikely)
5538       self.cfg.ReleaseDRBDMinors(instance.name)
5539       raise errors.OpExecError("Can't detach the disks from the network on"
5540                                " old node: %s" % (msg,))
5541
5542     # if we managed to detach at least one, we update all the disks of
5543     # the instance to point to the new secondary
5544     info("updating instance configuration")
5545     for dev, _, new_logical_id in iv_names.itervalues():
5546       dev.logical_id = new_logical_id
5547       cfg.SetDiskID(dev, pri_node)
5548     cfg.Update(instance)
5549
5550     # and now perform the drbd attach
5551     info("attaching primary drbds to new secondary (standalone => connected)")
5552     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5553                                            instance.disks, instance.name,
5554                                            False)
5555     for to_node, to_result in result.items():
5556       msg = to_result.fail_msg
5557       if msg:
5558         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5559                 hint="please do a gnt-instance info to see the"
5560                 " status of disks")
5561
5562     # this can fail as the old devices are degraded and _WaitForSync
5563     # does a combined result over all disks, so we don't check its
5564     # return value
5565     self.proc.LogStep(5, steps_total, "sync devices")
5566     _WaitForSync(self, instance, unlock=True)
5567
5568     # so check manually all the devices
5569     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5570       cfg.SetDiskID(dev, pri_node)
5571       result = self.rpc.call_blockdev_find(pri_node, dev)
5572       msg = result.fail_msg
5573       if not msg and not result.payload:
5574         msg = "disk not found"
5575       if msg:
5576         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5577                                  (idx, msg))
5578       if result.payload[5]:
5579         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5580
5581     self.proc.LogStep(6, steps_total, "removing old storage")
5582     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5583       info("remove logical volumes for disk/%d" % idx)
5584       for lv in old_lvs:
5585         cfg.SetDiskID(lv, old_node)
5586         msg = self.rpc.call_blockdev_remove(old_node, lv).fail_msg
5587         if msg:
5588           warning("Can't remove LV on old secondary: %s", msg,
5589                   hint="Cleanup stale volumes by hand")
5590
5591   def Exec(self, feedback_fn):
5592     """Execute disk replacement.
5593
5594     This dispatches the disk replacement to the appropriate handler.
5595
5596     """
5597     instance = self.instance
5598
5599     # Activate the instance disks if we're replacing them on a down instance
5600     if not instance.admin_up:
5601       _StartInstanceDisks(self, instance, True)
5602
5603     if self.op.mode == constants.REPLACE_DISK_CHG:
5604       fn = self._ExecD8Secondary
5605     else:
5606       fn = self._ExecD8DiskOnly
5607
5608     ret = fn(feedback_fn)
5609
5610     # Deactivate the instance disks if we're replacing them on a down instance
5611     if not instance.admin_up:
5612       _SafeShutdownInstanceDisks(self, instance)
5613
5614     return ret
5615
5616
5617 class LUGrowDisk(LogicalUnit):
5618   """Grow a disk of an instance.
5619
5620   """
5621   HPATH = "disk-grow"
5622   HTYPE = constants.HTYPE_INSTANCE
5623   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5624   REQ_BGL = False
5625
5626   def ExpandNames(self):
5627     self._ExpandAndLockInstance()
5628     self.needed_locks[locking.LEVEL_NODE] = []
5629     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5630
5631   def DeclareLocks(self, level):
5632     if level == locking.LEVEL_NODE:
5633       self._LockInstancesNodes()
5634
5635   def BuildHooksEnv(self):
5636     """Build hooks env.
5637
5638     This runs on the master, the primary and all the secondaries.
5639
5640     """
5641     env = {
5642       "DISK": self.op.disk,
5643       "AMOUNT": self.op.amount,
5644       }
5645     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5646     nl = [
5647       self.cfg.GetMasterNode(),
5648       self.instance.primary_node,
5649       ]
5650     return env, nl, nl
5651
5652   def CheckPrereq(self):
5653     """Check prerequisites.
5654
5655     This checks that the instance is in the cluster.
5656
5657     """
5658     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5659     assert instance is not None, \
5660       "Cannot retrieve locked instance %s" % self.op.instance_name
5661     nodenames = list(instance.all_nodes)
5662     for node in nodenames:
5663       _CheckNodeOnline(self, node)
5664
5665
5666     self.instance = instance
5667
5668     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5669       raise errors.OpPrereqError("Instance's disk layout does not support"
5670                                  " growing.")
5671
5672     self.disk = instance.FindDisk(self.op.disk)
5673
5674     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5675                                        instance.hypervisor)
5676     for node in nodenames:
5677       info = nodeinfo[node]
5678       info.Raise("Cannot get current information from node %s" % node)
5679       vg_free = info.payload.get('vg_free', None)
5680       if not isinstance(vg_free, int):
5681         raise errors.OpPrereqError("Can't compute free disk space on"
5682                                    " node %s" % node)
5683       if self.op.amount > vg_free:
5684         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5685                                    " %d MiB available, %d MiB required" %
5686                                    (node, vg_free, self.op.amount))
5687
5688   def Exec(self, feedback_fn):
5689     """Execute disk grow.
5690
5691     """
5692     instance = self.instance
5693     disk = self.disk
5694     for node in instance.all_nodes:
5695       self.cfg.SetDiskID(disk, node)
5696       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5697       result.Raise("Grow request failed to node %s" % node)
5698     disk.RecordGrow(self.op.amount)
5699     self.cfg.Update(instance)
5700     if self.op.wait_for_sync:
5701       disk_abort = not _WaitForSync(self, instance)
5702       if disk_abort:
5703         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5704                              " status.\nPlease check the instance.")
5705
5706
5707 class LUQueryInstanceData(NoHooksLU):
5708   """Query runtime instance data.
5709
5710   """
5711   _OP_REQP = ["instances", "static"]
5712   REQ_BGL = False
5713
5714   def ExpandNames(self):
5715     self.needed_locks = {}
5716     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5717
5718     if not isinstance(self.op.instances, list):
5719       raise errors.OpPrereqError("Invalid argument type 'instances'")
5720
5721     if self.op.instances:
5722       self.wanted_names = []
5723       for name in self.op.instances:
5724         full_name = self.cfg.ExpandInstanceName(name)
5725         if full_name is None:
5726           raise errors.OpPrereqError("Instance '%s' not known" % name)
5727         self.wanted_names.append(full_name)
5728       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5729     else:
5730       self.wanted_names = None
5731       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5732
5733     self.needed_locks[locking.LEVEL_NODE] = []
5734     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5735
5736   def DeclareLocks(self, level):
5737     if level == locking.LEVEL_NODE:
5738       self._LockInstancesNodes()
5739
5740   def CheckPrereq(self):
5741     """Check prerequisites.
5742
5743     This only checks the optional instance list against the existing names.
5744
5745     """
5746     if self.wanted_names is None:
5747       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5748
5749     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5750                              in self.wanted_names]
5751     return
5752
5753   def _ComputeDiskStatus(self, instance, snode, dev):
5754     """Compute block device status.
5755
5756     """
5757     static = self.op.static
5758     if not static:
5759       self.cfg.SetDiskID(dev, instance.primary_node)
5760       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5761       if dev_pstatus.offline:
5762         dev_pstatus = None
5763       else:
5764         dev_pstatus.Raise("Can't compute disk status for %s" % instance.name)
5765         dev_pstatus = dev_pstatus.payload
5766     else:
5767       dev_pstatus = None
5768
5769     if dev.dev_type in constants.LDS_DRBD:
5770       # we change the snode then (otherwise we use the one passed in)
5771       if dev.logical_id[0] == instance.primary_node:
5772         snode = dev.logical_id[1]
5773       else:
5774         snode = dev.logical_id[0]
5775
5776     if snode and not static:
5777       self.cfg.SetDiskID(dev, snode)
5778       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5779       if dev_sstatus.offline:
5780         dev_sstatus = None
5781       else:
5782         dev_sstatus.Raise("Can't compute disk status for %s" % instance.name)
5783         dev_sstatus = dev_sstatus.payload
5784     else:
5785       dev_sstatus = None
5786
5787     if dev.children:
5788       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5789                       for child in dev.children]
5790     else:
5791       dev_children = []
5792
5793     data = {
5794       "iv_name": dev.iv_name,
5795       "dev_type": dev.dev_type,
5796       "logical_id": dev.logical_id,
5797       "physical_id": dev.physical_id,
5798       "pstatus": dev_pstatus,
5799       "sstatus": dev_sstatus,
5800       "children": dev_children,
5801       "mode": dev.mode,
5802       }
5803
5804     return data
5805
5806   def Exec(self, feedback_fn):
5807     """Gather and return data"""
5808     result = {}
5809
5810     cluster = self.cfg.GetClusterInfo()
5811
5812     for instance in self.wanted_instances:
5813       if not self.op.static:
5814         remote_info = self.rpc.call_instance_info(instance.primary_node,
5815                                                   instance.name,
5816                                                   instance.hypervisor)
5817         remote_info.Raise("Error checking node %s" % instance.primary_node)
5818         remote_info = remote_info.payload
5819         if remote_info and "state" in remote_info:
5820           remote_state = "up"
5821         else:
5822           remote_state = "down"
5823       else:
5824         remote_state = None
5825       if instance.admin_up:
5826         config_state = "up"
5827       else:
5828         config_state = "down"
5829
5830       disks = [self._ComputeDiskStatus(instance, None, device)
5831                for device in instance.disks]
5832
5833       idict = {
5834         "name": instance.name,
5835         "config_state": config_state,
5836         "run_state": remote_state,
5837         "pnode": instance.primary_node,
5838         "snodes": instance.secondary_nodes,
5839         "os": instance.os,
5840         # this happens to be the same format used for hooks
5841         "nics": _NICListToTuple(self, instance.nics),
5842         "disks": disks,
5843         "hypervisor": instance.hypervisor,
5844         "network_port": instance.network_port,
5845         "hv_instance": instance.hvparams,
5846         "hv_actual": cluster.FillHV(instance),
5847         "be_instance": instance.beparams,
5848         "be_actual": cluster.FillBE(instance),
5849         }
5850
5851       result[instance.name] = idict
5852
5853     return result
5854
5855
5856 class LUSetInstanceParams(LogicalUnit):
5857   """Modifies an instances's parameters.
5858
5859   """
5860   HPATH = "instance-modify"
5861   HTYPE = constants.HTYPE_INSTANCE
5862   _OP_REQP = ["instance_name"]
5863   REQ_BGL = False
5864
5865   def CheckArguments(self):
5866     if not hasattr(self.op, 'nics'):
5867       self.op.nics = []
5868     if not hasattr(self.op, 'disks'):
5869       self.op.disks = []
5870     if not hasattr(self.op, 'beparams'):
5871       self.op.beparams = {}
5872     if not hasattr(self.op, 'hvparams'):
5873       self.op.hvparams = {}
5874     self.op.force = getattr(self.op, "force", False)
5875     if not (self.op.nics or self.op.disks or
5876             self.op.hvparams or self.op.beparams):
5877       raise errors.OpPrereqError("No changes submitted")
5878
5879     # Disk validation
5880     disk_addremove = 0
5881     for disk_op, disk_dict in self.op.disks:
5882       if disk_op == constants.DDM_REMOVE:
5883         disk_addremove += 1
5884         continue
5885       elif disk_op == constants.DDM_ADD:
5886         disk_addremove += 1
5887       else:
5888         if not isinstance(disk_op, int):
5889           raise errors.OpPrereqError("Invalid disk index")
5890         if not isinstance(disk_dict, dict):
5891           msg = "Invalid disk value: expected dict, got '%s'" % disk_dict
5892           raise errors.OpPrereqError(msg)
5893
5894       if disk_op == constants.DDM_ADD:
5895         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5896         if mode not in constants.DISK_ACCESS_SET:
5897           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5898         size = disk_dict.get('size', None)
5899         if size is None:
5900           raise errors.OpPrereqError("Required disk parameter size missing")
5901         try:
5902           size = int(size)
5903         except ValueError, err:
5904           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5905                                      str(err))
5906         disk_dict['size'] = size
5907       else:
5908         # modification of disk
5909         if 'size' in disk_dict:
5910           raise errors.OpPrereqError("Disk size change not possible, use"
5911                                      " grow-disk")
5912
5913     if disk_addremove > 1:
5914       raise errors.OpPrereqError("Only one disk add or remove operation"
5915                                  " supported at a time")
5916
5917     # NIC validation
5918     nic_addremove = 0
5919     for nic_op, nic_dict in self.op.nics:
5920       if nic_op == constants.DDM_REMOVE:
5921         nic_addremove += 1
5922         continue
5923       elif nic_op == constants.DDM_ADD:
5924         nic_addremove += 1
5925       else:
5926         if not isinstance(nic_op, int):
5927           raise errors.OpPrereqError("Invalid nic index")
5928         if not isinstance(nic_dict, dict):
5929           msg = "Invalid nic value: expected dict, got '%s'" % nic_dict
5930           raise errors.OpPrereqError(msg)
5931
5932       # nic_dict should be a dict
5933       nic_ip = nic_dict.get('ip', None)
5934       if nic_ip is not None:
5935         if nic_ip.lower() == constants.VALUE_NONE:
5936           nic_dict['ip'] = None
5937         else:
5938           if not utils.IsValidIP(nic_ip):
5939             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5940
5941       nic_bridge = nic_dict.get('bridge', None)
5942       nic_link = nic_dict.get('link', None)
5943       if nic_bridge and nic_link:
5944         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
5945                                    " at the same time")
5946       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5947         nic_dict['bridge'] = None
5948       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5949         nic_dict['link'] = None
5950
5951       if nic_op == constants.DDM_ADD:
5952         nic_mac = nic_dict.get('mac', None)
5953         if nic_mac is None:
5954           nic_dict['mac'] = constants.VALUE_AUTO
5955
5956       if 'mac' in nic_dict:
5957         nic_mac = nic_dict['mac']
5958         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5959           if not utils.IsValidMac(nic_mac):
5960             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5961         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5962           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5963                                      " modifying an existing nic")
5964
5965     if nic_addremove > 1:
5966       raise errors.OpPrereqError("Only one NIC add or remove operation"
5967                                  " supported at a time")
5968
5969   def ExpandNames(self):
5970     self._ExpandAndLockInstance()
5971     self.needed_locks[locking.LEVEL_NODE] = []
5972     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5973
5974   def DeclareLocks(self, level):
5975     if level == locking.LEVEL_NODE:
5976       self._LockInstancesNodes()
5977
5978   def BuildHooksEnv(self):
5979     """Build hooks env.
5980
5981     This runs on the master, primary and secondaries.
5982
5983     """
5984     args = dict()
5985     if constants.BE_MEMORY in self.be_new:
5986       args['memory'] = self.be_new[constants.BE_MEMORY]
5987     if constants.BE_VCPUS in self.be_new:
5988       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5989     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5990     # information at all.
5991     if self.op.nics:
5992       args['nics'] = []
5993       nic_override = dict(self.op.nics)
5994       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5995       for idx, nic in enumerate(self.instance.nics):
5996         if idx in nic_override:
5997           this_nic_override = nic_override[idx]
5998         else:
5999           this_nic_override = {}
6000         if 'ip' in this_nic_override:
6001           ip = this_nic_override['ip']
6002         else:
6003           ip = nic.ip
6004         if 'mac' in this_nic_override:
6005           mac = this_nic_override['mac']
6006         else:
6007           mac = nic.mac
6008         if idx in self.nic_pnew:
6009           nicparams = self.nic_pnew[idx]
6010         else:
6011           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6012         mode = nicparams[constants.NIC_MODE]
6013         link = nicparams[constants.NIC_LINK]
6014         args['nics'].append((ip, mac, mode, link))
6015       if constants.DDM_ADD in nic_override:
6016         ip = nic_override[constants.DDM_ADD].get('ip', None)
6017         mac = nic_override[constants.DDM_ADD]['mac']
6018         nicparams = self.nic_pnew[constants.DDM_ADD]
6019         mode = nicparams[constants.NIC_MODE]
6020         link = nicparams[constants.NIC_LINK]
6021         args['nics'].append((ip, mac, mode, link))
6022       elif constants.DDM_REMOVE in nic_override:
6023         del args['nics'][-1]
6024
6025     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6026     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6027     return env, nl, nl
6028
6029   def _GetUpdatedParams(self, old_params, update_dict,
6030                         default_values, parameter_types):
6031     """Return the new params dict for the given params.
6032
6033     @type old_params: dict
6034     @param old_params: old parameters
6035     @type update_dict: dict
6036     @param update_dict: dict containing new parameter values,
6037                         or constants.VALUE_DEFAULT to reset the
6038                         parameter to its default value
6039     @type default_values: dict
6040     @param default_values: default values for the filled parameters
6041     @type parameter_types: dict
6042     @param parameter_types: dict mapping target dict keys to types
6043                             in constants.ENFORCEABLE_TYPES
6044     @rtype: (dict, dict)
6045     @return: (new_parameters, filled_parameters)
6046
6047     """
6048     params_copy = copy.deepcopy(old_params)
6049     for key, val in update_dict.iteritems():
6050       if val == constants.VALUE_DEFAULT:
6051         try:
6052           del params_copy[key]
6053         except KeyError:
6054           pass
6055       else:
6056         params_copy[key] = val
6057     utils.ForceDictType(params_copy, parameter_types)
6058     params_filled = objects.FillDict(default_values, params_copy)
6059     return (params_copy, params_filled)
6060
6061   def CheckPrereq(self):
6062     """Check prerequisites.
6063
6064     This only checks the instance list against the existing names.
6065
6066     """
6067     force = self.force = self.op.force
6068
6069     # checking the new params on the primary/secondary nodes
6070
6071     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6072     cluster = self.cluster = self.cfg.GetClusterInfo()
6073     assert self.instance is not None, \
6074       "Cannot retrieve locked instance %s" % self.op.instance_name
6075     pnode = instance.primary_node
6076     nodelist = list(instance.all_nodes)
6077
6078     # hvparams processing
6079     if self.op.hvparams:
6080       i_hvdict, hv_new = self._GetUpdatedParams(
6081                              instance.hvparams, self.op.hvparams,
6082                              cluster.hvparams[instance.hypervisor],
6083                              constants.HVS_PARAMETER_TYPES)
6084       # local check
6085       hypervisor.GetHypervisor(
6086         instance.hypervisor).CheckParameterSyntax(hv_new)
6087       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6088       self.hv_new = hv_new # the new actual values
6089       self.hv_inst = i_hvdict # the new dict (without defaults)
6090     else:
6091       self.hv_new = self.hv_inst = {}
6092
6093     # beparams processing
6094     if self.op.beparams:
6095       i_bedict, be_new = self._GetUpdatedParams(
6096                              instance.beparams, self.op.beparams,
6097                              cluster.beparams[constants.PP_DEFAULT],
6098                              constants.BES_PARAMETER_TYPES)
6099       self.be_new = be_new # the new actual values
6100       self.be_inst = i_bedict # the new dict (without defaults)
6101     else:
6102       self.be_new = self.be_inst = {}
6103
6104     self.warn = []
6105
6106     if constants.BE_MEMORY in self.op.beparams and not self.force:
6107       mem_check_list = [pnode]
6108       if be_new[constants.BE_AUTO_BALANCE]:
6109         # either we changed auto_balance to yes or it was from before
6110         mem_check_list.extend(instance.secondary_nodes)
6111       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6112                                                   instance.hypervisor)
6113       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6114                                          instance.hypervisor)
6115       pninfo = nodeinfo[pnode]
6116       msg = pninfo.fail_msg
6117       if msg:
6118         # Assume the primary node is unreachable and go ahead
6119         self.warn.append("Can't get info from primary node %s: %s" %
6120                          (pnode,  msg))
6121       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6122         self.warn.append("Node data from primary node %s doesn't contain"
6123                          " free memory information" % pnode)
6124       elif instance_info.fail_msg:
6125         self.warn.append("Can't get instance runtime information: %s" %
6126                         instance_info.fail_msg)
6127       else:
6128         if instance_info.payload:
6129           current_mem = int(instance_info.payload['memory'])
6130         else:
6131           # Assume instance not running
6132           # (there is a slight race condition here, but it's not very probable,
6133           # and we have no other way to check)
6134           current_mem = 0
6135         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6136                     pninfo.payload['memory_free'])
6137         if miss_mem > 0:
6138           raise errors.OpPrereqError("This change will prevent the instance"
6139                                      " from starting, due to %d MB of memory"
6140                                      " missing on its primary node" % miss_mem)
6141
6142       if be_new[constants.BE_AUTO_BALANCE]:
6143         for node, nres in nodeinfo.items():
6144           if node not in instance.secondary_nodes:
6145             continue
6146           msg = nres.fail_msg
6147           if msg:
6148             self.warn.append("Can't get info from secondary node %s: %s" %
6149                              (node, msg))
6150           elif not isinstance(nres.payload.get('memory_free', None), int):
6151             self.warn.append("Secondary node %s didn't return free"
6152                              " memory information" % node)
6153           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6154             self.warn.append("Not enough memory to failover instance to"
6155                              " secondary node %s" % node)
6156
6157     # NIC processing
6158     self.nic_pnew = {}
6159     self.nic_pinst = {}
6160     for nic_op, nic_dict in self.op.nics:
6161       if nic_op == constants.DDM_REMOVE:
6162         if not instance.nics:
6163           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6164         continue
6165       if nic_op != constants.DDM_ADD:
6166         # an existing nic
6167         if nic_op < 0 or nic_op >= len(instance.nics):
6168           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6169                                      " are 0 to %d" %
6170                                      (nic_op, len(instance.nics)))
6171         old_nic_params = instance.nics[nic_op].nicparams
6172         old_nic_ip = instance.nics[nic_op].ip
6173       else:
6174         old_nic_params = {}
6175         old_nic_ip = None
6176
6177       update_params_dict = dict([(key, nic_dict[key])
6178                                  for key in constants.NICS_PARAMETERS
6179                                  if key in nic_dict])
6180
6181       if 'bridge' in nic_dict:
6182         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6183
6184       new_nic_params, new_filled_nic_params = \
6185           self._GetUpdatedParams(old_nic_params, update_params_dict,
6186                                  cluster.nicparams[constants.PP_DEFAULT],
6187                                  constants.NICS_PARAMETER_TYPES)
6188       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6189       self.nic_pinst[nic_op] = new_nic_params
6190       self.nic_pnew[nic_op] = new_filled_nic_params
6191       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6192
6193       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6194         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6195         msg = self.rpc.call_bridges_exist(pnode, [nic_bridge]).fail_msg
6196         if msg:
6197           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6198           if self.force:
6199             self.warn.append(msg)
6200           else:
6201             raise errors.OpPrereqError(msg)
6202       if new_nic_mode == constants.NIC_MODE_ROUTED:
6203         if 'ip' in nic_dict:
6204           nic_ip = nic_dict['ip']
6205         else:
6206           nic_ip = old_nic_ip
6207         if nic_ip is None:
6208           raise errors.OpPrereqError('Cannot set the nic ip to None'
6209                                      ' on a routed nic')
6210       if 'mac' in nic_dict:
6211         nic_mac = nic_dict['mac']
6212         if nic_mac is None:
6213           raise errors.OpPrereqError('Cannot set the nic mac to None')
6214         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6215           # otherwise generate the mac
6216           nic_dict['mac'] = self.cfg.GenerateMAC()
6217         else:
6218           # or validate/reserve the current one
6219           if self.cfg.IsMacInUse(nic_mac):
6220             raise errors.OpPrereqError("MAC address %s already in use"
6221                                        " in cluster" % nic_mac)
6222
6223     # DISK processing
6224     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6225       raise errors.OpPrereqError("Disk operations not supported for"
6226                                  " diskless instances")
6227     for disk_op, disk_dict in self.op.disks:
6228       if disk_op == constants.DDM_REMOVE:
6229         if len(instance.disks) == 1:
6230           raise errors.OpPrereqError("Cannot remove the last disk of"
6231                                      " an instance")
6232         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6233         ins_l = ins_l[pnode]
6234         msg = ins_l.fail_msg
6235         if msg:
6236           raise errors.OpPrereqError("Can't contact node %s: %s" %
6237                                      (pnode, msg))
6238         if instance.name in ins_l.payload:
6239           raise errors.OpPrereqError("Instance is running, can't remove"
6240                                      " disks.")
6241
6242       if (disk_op == constants.DDM_ADD and
6243           len(instance.nics) >= constants.MAX_DISKS):
6244         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6245                                    " add more" % constants.MAX_DISKS)
6246       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6247         # an existing disk
6248         if disk_op < 0 or disk_op >= len(instance.disks):
6249           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6250                                      " are 0 to %d" %
6251                                      (disk_op, len(instance.disks)))
6252
6253     return
6254
6255   def Exec(self, feedback_fn):
6256     """Modifies an instance.
6257
6258     All parameters take effect only at the next restart of the instance.
6259
6260     """
6261     # Process here the warnings from CheckPrereq, as we don't have a
6262     # feedback_fn there.
6263     for warn in self.warn:
6264       feedback_fn("WARNING: %s" % warn)
6265
6266     result = []
6267     instance = self.instance
6268     cluster = self.cluster
6269     # disk changes
6270     for disk_op, disk_dict in self.op.disks:
6271       if disk_op == constants.DDM_REMOVE:
6272         # remove the last disk
6273         device = instance.disks.pop()
6274         device_idx = len(instance.disks)
6275         for node, disk in device.ComputeNodeTree(instance.primary_node):
6276           self.cfg.SetDiskID(disk, node)
6277           msg = self.rpc.call_blockdev_remove(node, disk).fail_msg
6278           if msg:
6279             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6280                             " continuing anyway", device_idx, node, msg)
6281         result.append(("disk/%d" % device_idx, "remove"))
6282       elif disk_op == constants.DDM_ADD:
6283         # add a new disk
6284         if instance.disk_template == constants.DT_FILE:
6285           file_driver, file_path = instance.disks[0].logical_id
6286           file_path = os.path.dirname(file_path)
6287         else:
6288           file_driver = file_path = None
6289         disk_idx_base = len(instance.disks)
6290         new_disk = _GenerateDiskTemplate(self,
6291                                          instance.disk_template,
6292                                          instance.name, instance.primary_node,
6293                                          instance.secondary_nodes,
6294                                          [disk_dict],
6295                                          file_path,
6296                                          file_driver,
6297                                          disk_idx_base)[0]
6298         instance.disks.append(new_disk)
6299         info = _GetInstanceInfoText(instance)
6300
6301         logging.info("Creating volume %s for instance %s",
6302                      new_disk.iv_name, instance.name)
6303         # Note: this needs to be kept in sync with _CreateDisks
6304         #HARDCODE
6305         for node in instance.all_nodes:
6306           f_create = node == instance.primary_node
6307           try:
6308             _CreateBlockDev(self, node, instance, new_disk,
6309                             f_create, info, f_create)
6310           except errors.OpExecError, err:
6311             self.LogWarning("Failed to create volume %s (%s) on"
6312                             " node %s: %s",
6313                             new_disk.iv_name, new_disk, node, err)
6314         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6315                        (new_disk.size, new_disk.mode)))
6316       else:
6317         # change a given disk
6318         instance.disks[disk_op].mode = disk_dict['mode']
6319         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6320     # NIC changes
6321     for nic_op, nic_dict in self.op.nics:
6322       if nic_op == constants.DDM_REMOVE:
6323         # remove the last nic
6324         del instance.nics[-1]
6325         result.append(("nic.%d" % len(instance.nics), "remove"))
6326       elif nic_op == constants.DDM_ADD:
6327         # mac and bridge should be set, by now
6328         mac = nic_dict['mac']
6329         ip = nic_dict.get('ip', None)
6330         nicparams = self.nic_pinst[constants.DDM_ADD]
6331         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6332         instance.nics.append(new_nic)
6333         result.append(("nic.%d" % (len(instance.nics) - 1),
6334                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6335                        (new_nic.mac, new_nic.ip,
6336                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6337                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6338                        )))
6339       else:
6340         for key in 'mac', 'ip':
6341           if key in nic_dict:
6342             setattr(instance.nics[nic_op], key, nic_dict[key])
6343         if nic_op in self.nic_pnew:
6344           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6345         for key, val in nic_dict.iteritems():
6346           result.append(("nic.%s/%d" % (key, nic_op), val))
6347
6348     # hvparams changes
6349     if self.op.hvparams:
6350       instance.hvparams = self.hv_inst
6351       for key, val in self.op.hvparams.iteritems():
6352         result.append(("hv/%s" % key, val))
6353
6354     # beparams changes
6355     if self.op.beparams:
6356       instance.beparams = self.be_inst
6357       for key, val in self.op.beparams.iteritems():
6358         result.append(("be/%s" % key, val))
6359
6360     self.cfg.Update(instance)
6361
6362     return result
6363
6364
6365 class LUQueryExports(NoHooksLU):
6366   """Query the exports list
6367
6368   """
6369   _OP_REQP = ['nodes']
6370   REQ_BGL = False
6371
6372   def ExpandNames(self):
6373     self.needed_locks = {}
6374     self.share_locks[locking.LEVEL_NODE] = 1
6375     if not self.op.nodes:
6376       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6377     else:
6378       self.needed_locks[locking.LEVEL_NODE] = \
6379         _GetWantedNodes(self, self.op.nodes)
6380
6381   def CheckPrereq(self):
6382     """Check prerequisites.
6383
6384     """
6385     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6386
6387   def Exec(self, feedback_fn):
6388     """Compute the list of all the exported system images.
6389
6390     @rtype: dict
6391     @return: a dictionary with the structure node->(export-list)
6392         where export-list is a list of the instances exported on
6393         that node.
6394
6395     """
6396     rpcresult = self.rpc.call_export_list(self.nodes)
6397     result = {}
6398     for node in rpcresult:
6399       if rpcresult[node].fail_msg:
6400         result[node] = False
6401       else:
6402         result[node] = rpcresult[node].payload
6403
6404     return result
6405
6406
6407 class LUExportInstance(LogicalUnit):
6408   """Export an instance to an image in the cluster.
6409
6410   """
6411   HPATH = "instance-export"
6412   HTYPE = constants.HTYPE_INSTANCE
6413   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6414   REQ_BGL = False
6415
6416   def ExpandNames(self):
6417     self._ExpandAndLockInstance()
6418     # FIXME: lock only instance primary and destination node
6419     #
6420     # Sad but true, for now we have do lock all nodes, as we don't know where
6421     # the previous export might be, and and in this LU we search for it and
6422     # remove it from its current node. In the future we could fix this by:
6423     #  - making a tasklet to search (share-lock all), then create the new one,
6424     #    then one to remove, after
6425     #  - removing the removal operation altoghether
6426     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6427
6428   def DeclareLocks(self, level):
6429     """Last minute lock declaration."""
6430     # All nodes are locked anyway, so nothing to do here.
6431
6432   def BuildHooksEnv(self):
6433     """Build hooks env.
6434
6435     This will run on the master, primary node and target node.
6436
6437     """
6438     env = {
6439       "EXPORT_NODE": self.op.target_node,
6440       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6441       }
6442     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6443     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6444           self.op.target_node]
6445     return env, nl, nl
6446
6447   def CheckPrereq(self):
6448     """Check prerequisites.
6449
6450     This checks that the instance and node names are valid.
6451
6452     """
6453     instance_name = self.op.instance_name
6454     self.instance = self.cfg.GetInstanceInfo(instance_name)
6455     assert self.instance is not None, \
6456           "Cannot retrieve locked instance %s" % self.op.instance_name
6457     _CheckNodeOnline(self, self.instance.primary_node)
6458
6459     self.dst_node = self.cfg.GetNodeInfo(
6460       self.cfg.ExpandNodeName(self.op.target_node))
6461
6462     if self.dst_node is None:
6463       # This is wrong node name, not a non-locked node
6464       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6465     _CheckNodeOnline(self, self.dst_node.name)
6466     _CheckNodeNotDrained(self, self.dst_node.name)
6467
6468     # instance disk type verification
6469     for disk in self.instance.disks:
6470       if disk.dev_type == constants.LD_FILE:
6471         raise errors.OpPrereqError("Export not supported for instances with"
6472                                    " file-based disks")
6473
6474   def Exec(self, feedback_fn):
6475     """Export an instance to an image in the cluster.
6476
6477     """
6478     instance = self.instance
6479     dst_node = self.dst_node
6480     src_node = instance.primary_node
6481     if self.op.shutdown:
6482       # shutdown the instance, but not the disks
6483       result = self.rpc.call_instance_shutdown(src_node, instance)
6484       result.Raise("Could not shutdown instance %s on"
6485                    " node %s" % (instance.name, src_node))
6486
6487     vgname = self.cfg.GetVGName()
6488
6489     snap_disks = []
6490
6491     # set the disks ID correctly since call_instance_start needs the
6492     # correct drbd minor to create the symlinks
6493     for disk in instance.disks:
6494       self.cfg.SetDiskID(disk, src_node)
6495
6496     try:
6497       for idx, disk in enumerate(instance.disks):
6498         # result.payload will be a snapshot of an lvm leaf of the one we passed
6499         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6500         msg = result.fail_msg
6501         if msg:
6502           self.LogWarning("Could not snapshot disk/%s on node %s: %s",
6503                           idx, src_node, msg)
6504           snap_disks.append(False)
6505         else:
6506           disk_id = (vgname, result.payload)
6507           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6508                                  logical_id=disk_id, physical_id=disk_id,
6509                                  iv_name=disk.iv_name)
6510           snap_disks.append(new_dev)
6511
6512     finally:
6513       if self.op.shutdown and instance.admin_up:
6514         result = self.rpc.call_instance_start(src_node, instance, None, None)
6515         msg = result.fail_msg
6516         if msg:
6517           _ShutdownInstanceDisks(self, instance)
6518           raise errors.OpExecError("Could not start instance: %s" % msg)
6519
6520     # TODO: check for size
6521
6522     cluster_name = self.cfg.GetClusterName()
6523     for idx, dev in enumerate(snap_disks):
6524       if dev:
6525         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6526                                                instance, cluster_name, idx)
6527         msg = result.fail_msg
6528         if msg:
6529           self.LogWarning("Could not export disk/%s from node %s to"
6530                           " node %s: %s", idx, src_node, dst_node.name, msg)
6531         msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
6532         if msg:
6533           self.LogWarning("Could not remove snapshot for disk/%d from node"
6534                           " %s: %s", idx, src_node, msg)
6535
6536     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6537     msg = result.fail_msg
6538     if msg:
6539       self.LogWarning("Could not finalize export for instance %s"
6540                       " on node %s: %s", instance.name, dst_node.name, msg)
6541
6542     nodelist = self.cfg.GetNodeList()
6543     nodelist.remove(dst_node.name)
6544
6545     # on one-node clusters nodelist will be empty after the removal
6546     # if we proceed the backup would be removed because OpQueryExports
6547     # substitutes an empty list with the full cluster node list.
6548     iname = instance.name
6549     if nodelist:
6550       exportlist = self.rpc.call_export_list(nodelist)
6551       for node in exportlist:
6552         if exportlist[node].fail_msg:
6553           continue
6554         if iname in exportlist[node].payload:
6555           msg = self.rpc.call_export_remove(node, iname).fail_msg
6556           if msg:
6557             self.LogWarning("Could not remove older export for instance %s"
6558                             " on node %s: %s", iname, node, msg)
6559
6560
6561 class LURemoveExport(NoHooksLU):
6562   """Remove exports related to the named instance.
6563
6564   """
6565   _OP_REQP = ["instance_name"]
6566   REQ_BGL = False
6567
6568   def ExpandNames(self):
6569     self.needed_locks = {}
6570     # We need all nodes to be locked in order for RemoveExport to work, but we
6571     # don't need to lock the instance itself, as nothing will happen to it (and
6572     # we can remove exports also for a removed instance)
6573     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6574
6575   def CheckPrereq(self):
6576     """Check prerequisites.
6577     """
6578     pass
6579
6580   def Exec(self, feedback_fn):
6581     """Remove any export.
6582
6583     """
6584     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6585     # If the instance was not found we'll try with the name that was passed in.
6586     # This will only work if it was an FQDN, though.
6587     fqdn_warn = False
6588     if not instance_name:
6589       fqdn_warn = True
6590       instance_name = self.op.instance_name
6591
6592     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6593     exportlist = self.rpc.call_export_list(locked_nodes)
6594     found = False
6595     for node in exportlist:
6596       msg = exportlist[node].fail_msg
6597       if msg:
6598         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6599         continue
6600       if instance_name in exportlist[node].payload:
6601         found = True
6602         result = self.rpc.call_export_remove(node, instance_name)
6603         msg = result.fail_msg
6604         if msg:
6605           logging.error("Could not remove export for instance %s"
6606                         " on node %s: %s", instance_name, node, msg)
6607
6608     if fqdn_warn and not found:
6609       feedback_fn("Export not found. If trying to remove an export belonging"
6610                   " to a deleted instance please use its Fully Qualified"
6611                   " Domain Name.")
6612
6613
6614 class TagsLU(NoHooksLU):
6615   """Generic tags LU.
6616
6617   This is an abstract class which is the parent of all the other tags LUs.
6618
6619   """
6620
6621   def ExpandNames(self):
6622     self.needed_locks = {}
6623     if self.op.kind == constants.TAG_NODE:
6624       name = self.cfg.ExpandNodeName(self.op.name)
6625       if name is None:
6626         raise errors.OpPrereqError("Invalid node name (%s)" %
6627                                    (self.op.name,))
6628       self.op.name = name
6629       self.needed_locks[locking.LEVEL_NODE] = name
6630     elif self.op.kind == constants.TAG_INSTANCE:
6631       name = self.cfg.ExpandInstanceName(self.op.name)
6632       if name is None:
6633         raise errors.OpPrereqError("Invalid instance name (%s)" %
6634                                    (self.op.name,))
6635       self.op.name = name
6636       self.needed_locks[locking.LEVEL_INSTANCE] = name
6637
6638   def CheckPrereq(self):
6639     """Check prerequisites.
6640
6641     """
6642     if self.op.kind == constants.TAG_CLUSTER:
6643       self.target = self.cfg.GetClusterInfo()
6644     elif self.op.kind == constants.TAG_NODE:
6645       self.target = self.cfg.GetNodeInfo(self.op.name)
6646     elif self.op.kind == constants.TAG_INSTANCE:
6647       self.target = self.cfg.GetInstanceInfo(self.op.name)
6648     else:
6649       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6650                                  str(self.op.kind))
6651
6652
6653 class LUGetTags(TagsLU):
6654   """Returns the tags of a given object.
6655
6656   """
6657   _OP_REQP = ["kind", "name"]
6658   REQ_BGL = False
6659
6660   def Exec(self, feedback_fn):
6661     """Returns the tag list.
6662
6663     """
6664     return list(self.target.GetTags())
6665
6666
6667 class LUSearchTags(NoHooksLU):
6668   """Searches the tags for a given pattern.
6669
6670   """
6671   _OP_REQP = ["pattern"]
6672   REQ_BGL = False
6673
6674   def ExpandNames(self):
6675     self.needed_locks = {}
6676
6677   def CheckPrereq(self):
6678     """Check prerequisites.
6679
6680     This checks the pattern passed for validity by compiling it.
6681
6682     """
6683     try:
6684       self.re = re.compile(self.op.pattern)
6685     except re.error, err:
6686       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6687                                  (self.op.pattern, err))
6688
6689   def Exec(self, feedback_fn):
6690     """Returns the tag list.
6691
6692     """
6693     cfg = self.cfg
6694     tgts = [("/cluster", cfg.GetClusterInfo())]
6695     ilist = cfg.GetAllInstancesInfo().values()
6696     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6697     nlist = cfg.GetAllNodesInfo().values()
6698     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6699     results = []
6700     for path, target in tgts:
6701       for tag in target.GetTags():
6702         if self.re.search(tag):
6703           results.append((path, tag))
6704     return results
6705
6706
6707 class LUAddTags(TagsLU):
6708   """Sets a tag on a given object.
6709
6710   """
6711   _OP_REQP = ["kind", "name", "tags"]
6712   REQ_BGL = False
6713
6714   def CheckPrereq(self):
6715     """Check prerequisites.
6716
6717     This checks the type and length of the tag name and value.
6718
6719     """
6720     TagsLU.CheckPrereq(self)
6721     for tag in self.op.tags:
6722       objects.TaggableObject.ValidateTag(tag)
6723
6724   def Exec(self, feedback_fn):
6725     """Sets the tag.
6726
6727     """
6728     try:
6729       for tag in self.op.tags:
6730         self.target.AddTag(tag)
6731     except errors.TagError, err:
6732       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6733     try:
6734       self.cfg.Update(self.target)
6735     except errors.ConfigurationError:
6736       raise errors.OpRetryError("There has been a modification to the"
6737                                 " config file and the operation has been"
6738                                 " aborted. Please retry.")
6739
6740
6741 class LUDelTags(TagsLU):
6742   """Delete a list of tags from a given object.
6743
6744   """
6745   _OP_REQP = ["kind", "name", "tags"]
6746   REQ_BGL = False
6747
6748   def CheckPrereq(self):
6749     """Check prerequisites.
6750
6751     This checks that we have the given tag.
6752
6753     """
6754     TagsLU.CheckPrereq(self)
6755     for tag in self.op.tags:
6756       objects.TaggableObject.ValidateTag(tag)
6757     del_tags = frozenset(self.op.tags)
6758     cur_tags = self.target.GetTags()
6759     if not del_tags <= cur_tags:
6760       diff_tags = del_tags - cur_tags
6761       diff_names = ["'%s'" % tag for tag in diff_tags]
6762       diff_names.sort()
6763       raise errors.OpPrereqError("Tag(s) %s not found" %
6764                                  (",".join(diff_names)))
6765
6766   def Exec(self, feedback_fn):
6767     """Remove the tag from the object.
6768
6769     """
6770     for tag in self.op.tags:
6771       self.target.RemoveTag(tag)
6772     try:
6773       self.cfg.Update(self.target)
6774     except errors.ConfigurationError:
6775       raise errors.OpRetryError("There has been a modification to the"
6776                                 " config file and the operation has been"
6777                                 " aborted. Please retry.")
6778
6779
6780 class LUTestDelay(NoHooksLU):
6781   """Sleep for a specified amount of time.
6782
6783   This LU sleeps on the master and/or nodes for a specified amount of
6784   time.
6785
6786   """
6787   _OP_REQP = ["duration", "on_master", "on_nodes"]
6788   REQ_BGL = False
6789
6790   def ExpandNames(self):
6791     """Expand names and set required locks.
6792
6793     This expands the node list, if any.
6794
6795     """
6796     self.needed_locks = {}
6797     if self.op.on_nodes:
6798       # _GetWantedNodes can be used here, but is not always appropriate to use
6799       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6800       # more information.
6801       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6802       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6803
6804   def CheckPrereq(self):
6805     """Check prerequisites.
6806
6807     """
6808
6809   def Exec(self, feedback_fn):
6810     """Do the actual sleep.
6811
6812     """
6813     if self.op.on_master:
6814       if not utils.TestDelay(self.op.duration):
6815         raise errors.OpExecError("Error during master delay test")
6816     if self.op.on_nodes:
6817       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6818       for node, node_result in result.items():
6819         node_result.Raise("Failure during rpc call to node %s" % node)
6820
6821
6822 class IAllocator(object):
6823   """IAllocator framework.
6824
6825   An IAllocator instance has three sets of attributes:
6826     - cfg that is needed to query the cluster
6827     - input data (all members of the _KEYS class attribute are required)
6828     - four buffer attributes (in|out_data|text), that represent the
6829       input (to the external script) in text and data structure format,
6830       and the output from it, again in two formats
6831     - the result variables from the script (success, info, nodes) for
6832       easy usage
6833
6834   """
6835   _ALLO_KEYS = [
6836     "mem_size", "disks", "disk_template",
6837     "os", "tags", "nics", "vcpus", "hypervisor",
6838     ]
6839   _RELO_KEYS = [
6840     "relocate_from",
6841     ]
6842
6843   def __init__(self, lu, mode, name, **kwargs):
6844     self.lu = lu
6845     # init buffer variables
6846     self.in_text = self.out_text = self.in_data = self.out_data = None
6847     # init all input fields so that pylint is happy
6848     self.mode = mode
6849     self.name = name
6850     self.mem_size = self.disks = self.disk_template = None
6851     self.os = self.tags = self.nics = self.vcpus = None
6852     self.hypervisor = None
6853     self.relocate_from = None
6854     # computed fields
6855     self.required_nodes = None
6856     # init result fields
6857     self.success = self.info = self.nodes = None
6858     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6859       keyset = self._ALLO_KEYS
6860     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6861       keyset = self._RELO_KEYS
6862     else:
6863       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6864                                    " IAllocator" % self.mode)
6865     for key in kwargs:
6866       if key not in keyset:
6867         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6868                                      " IAllocator" % key)
6869       setattr(self, key, kwargs[key])
6870     for key in keyset:
6871       if key not in kwargs:
6872         raise errors.ProgrammerError("Missing input parameter '%s' to"
6873                                      " IAllocator" % key)
6874     self._BuildInputData()
6875
6876   def _ComputeClusterData(self):
6877     """Compute the generic allocator input data.
6878
6879     This is the data that is independent of the actual operation.
6880
6881     """
6882     cfg = self.lu.cfg
6883     cluster_info = cfg.GetClusterInfo()
6884     # cluster data
6885     data = {
6886       "version": constants.IALLOCATOR_VERSION,
6887       "cluster_name": cfg.GetClusterName(),
6888       "cluster_tags": list(cluster_info.GetTags()),
6889       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6890       # we don't have job IDs
6891       }
6892     iinfo = cfg.GetAllInstancesInfo().values()
6893     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6894
6895     # node data
6896     node_results = {}
6897     node_list = cfg.GetNodeList()
6898
6899     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6900       hypervisor_name = self.hypervisor
6901     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6902       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6903
6904     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6905                                            hypervisor_name)
6906     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6907                        cluster_info.enabled_hypervisors)
6908     for nname, nresult in node_data.items():
6909       # first fill in static (config-based) values
6910       ninfo = cfg.GetNodeInfo(nname)
6911       pnr = {
6912         "tags": list(ninfo.GetTags()),
6913         "primary_ip": ninfo.primary_ip,
6914         "secondary_ip": ninfo.secondary_ip,
6915         "offline": ninfo.offline,
6916         "drained": ninfo.drained,
6917         "master_candidate": ninfo.master_candidate,
6918         }
6919
6920       if not ninfo.offline:
6921         nresult.Raise("Can't get data for node %s" % nname)
6922         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
6923                                 nname)
6924         remote_info = nresult.payload
6925         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6926                      'vg_size', 'vg_free', 'cpu_total']:
6927           if attr not in remote_info:
6928             raise errors.OpExecError("Node '%s' didn't return attribute"
6929                                      " '%s'" % (nname, attr))
6930           if not isinstance(remote_info[attr], int):
6931             raise errors.OpExecError("Node '%s' returned invalid value"
6932                                      " for '%s': %s" %
6933                                      (nname, attr, remote_info[attr]))
6934         # compute memory used by primary instances
6935         i_p_mem = i_p_up_mem = 0
6936         for iinfo, beinfo in i_list:
6937           if iinfo.primary_node == nname:
6938             i_p_mem += beinfo[constants.BE_MEMORY]
6939             if iinfo.name not in node_iinfo[nname].payload:
6940               i_used_mem = 0
6941             else:
6942               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6943             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6944             remote_info['memory_free'] -= max(0, i_mem_diff)
6945
6946             if iinfo.admin_up:
6947               i_p_up_mem += beinfo[constants.BE_MEMORY]
6948
6949         # compute memory used by instances
6950         pnr_dyn = {
6951           "total_memory": remote_info['memory_total'],
6952           "reserved_memory": remote_info['memory_dom0'],
6953           "free_memory": remote_info['memory_free'],
6954           "total_disk": remote_info['vg_size'],
6955           "free_disk": remote_info['vg_free'],
6956           "total_cpus": remote_info['cpu_total'],
6957           "i_pri_memory": i_p_mem,
6958           "i_pri_up_memory": i_p_up_mem,
6959           }
6960         pnr.update(pnr_dyn)
6961
6962       node_results[nname] = pnr
6963     data["nodes"] = node_results
6964
6965     # instance data
6966     instance_data = {}
6967     for iinfo, beinfo in i_list:
6968       nic_data = []
6969       for nic in iinfo.nics:
6970         filled_params = objects.FillDict(
6971             cluster_info.nicparams[constants.PP_DEFAULT],
6972             nic.nicparams)
6973         nic_dict = {"mac": nic.mac,
6974                     "ip": nic.ip,
6975                     "mode": filled_params[constants.NIC_MODE],
6976                     "link": filled_params[constants.NIC_LINK],
6977                    }
6978         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6979           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6980         nic_data.append(nic_dict)
6981       pir = {
6982         "tags": list(iinfo.GetTags()),
6983         "admin_up": iinfo.admin_up,
6984         "vcpus": beinfo[constants.BE_VCPUS],
6985         "memory": beinfo[constants.BE_MEMORY],
6986         "os": iinfo.os,
6987         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6988         "nics": nic_data,
6989         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6990         "disk_template": iinfo.disk_template,
6991         "hypervisor": iinfo.hypervisor,
6992         }
6993       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6994                                                  pir["disks"])
6995       instance_data[iinfo.name] = pir
6996
6997     data["instances"] = instance_data
6998
6999     self.in_data = data
7000
7001   def _AddNewInstance(self):
7002     """Add new instance data to allocator structure.
7003
7004     This in combination with _AllocatorGetClusterData will create the
7005     correct structure needed as input for the allocator.
7006
7007     The checks for the completeness of the opcode must have already been
7008     done.
7009
7010     """
7011     data = self.in_data
7012
7013     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7014
7015     if self.disk_template in constants.DTS_NET_MIRROR:
7016       self.required_nodes = 2
7017     else:
7018       self.required_nodes = 1
7019     request = {
7020       "type": "allocate",
7021       "name": self.name,
7022       "disk_template": self.disk_template,
7023       "tags": self.tags,
7024       "os": self.os,
7025       "vcpus": self.vcpus,
7026       "memory": self.mem_size,
7027       "disks": self.disks,
7028       "disk_space_total": disk_space,
7029       "nics": self.nics,
7030       "required_nodes": self.required_nodes,
7031       }
7032     data["request"] = request
7033
7034   def _AddRelocateInstance(self):
7035     """Add relocate instance data to allocator structure.
7036
7037     This in combination with _IAllocatorGetClusterData will create the
7038     correct structure needed as input for the allocator.
7039
7040     The checks for the completeness of the opcode must have already been
7041     done.
7042
7043     """
7044     instance = self.lu.cfg.GetInstanceInfo(self.name)
7045     if instance is None:
7046       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7047                                    " IAllocator" % self.name)
7048
7049     if instance.disk_template not in constants.DTS_NET_MIRROR:
7050       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7051
7052     if len(instance.secondary_nodes) != 1:
7053       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7054
7055     self.required_nodes = 1
7056     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7057     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7058
7059     request = {
7060       "type": "relocate",
7061       "name": self.name,
7062       "disk_space_total": disk_space,
7063       "required_nodes": self.required_nodes,
7064       "relocate_from": self.relocate_from,
7065       }
7066     self.in_data["request"] = request
7067
7068   def _BuildInputData(self):
7069     """Build input data structures.
7070
7071     """
7072     self._ComputeClusterData()
7073
7074     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7075       self._AddNewInstance()
7076     else:
7077       self._AddRelocateInstance()
7078
7079     self.in_text = serializer.Dump(self.in_data)
7080
7081   def Run(self, name, validate=True, call_fn=None):
7082     """Run an instance allocator and return the results.
7083
7084     """
7085     if call_fn is None:
7086       call_fn = self.lu.rpc.call_iallocator_runner
7087     data = self.in_text
7088
7089     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7090     result.Raise("Failure while running the iallocator script")
7091
7092     self.out_text = result.payload
7093     if validate:
7094       self._ValidateResult()
7095
7096   def _ValidateResult(self):
7097     """Process the allocator results.
7098
7099     This will process and if successful save the result in
7100     self.out_data and the other parameters.
7101
7102     """
7103     try:
7104       rdict = serializer.Load(self.out_text)
7105     except Exception, err:
7106       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7107
7108     if not isinstance(rdict, dict):
7109       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7110
7111     for key in "success", "info", "nodes":
7112       if key not in rdict:
7113         raise errors.OpExecError("Can't parse iallocator results:"
7114                                  " missing key '%s'" % key)
7115       setattr(self, key, rdict[key])
7116
7117     if not isinstance(rdict["nodes"], list):
7118       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7119                                " is not a list")
7120     self.out_data = rdict
7121
7122
7123 class LUTestAllocator(NoHooksLU):
7124   """Run allocator tests.
7125
7126   This LU runs the allocator tests
7127
7128   """
7129   _OP_REQP = ["direction", "mode", "name"]
7130
7131   def CheckPrereq(self):
7132     """Check prerequisites.
7133
7134     This checks the opcode parameters depending on the director and mode test.
7135
7136     """
7137     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7138       for attr in ["name", "mem_size", "disks", "disk_template",
7139                    "os", "tags", "nics", "vcpus"]:
7140         if not hasattr(self.op, attr):
7141           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7142                                      attr)
7143       iname = self.cfg.ExpandInstanceName(self.op.name)
7144       if iname is not None:
7145         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7146                                    iname)
7147       if not isinstance(self.op.nics, list):
7148         raise errors.OpPrereqError("Invalid parameter 'nics'")
7149       for row in self.op.nics:
7150         if (not isinstance(row, dict) or
7151             "mac" not in row or
7152             "ip" not in row or
7153             "bridge" not in row):
7154           raise errors.OpPrereqError("Invalid contents of the"
7155                                      " 'nics' parameter")
7156       if not isinstance(self.op.disks, list):
7157         raise errors.OpPrereqError("Invalid parameter 'disks'")
7158       for row in self.op.disks:
7159         if (not isinstance(row, dict) or
7160             "size" not in row or
7161             not isinstance(row["size"], int) or
7162             "mode" not in row or
7163             row["mode"] not in ['r', 'w']):
7164           raise errors.OpPrereqError("Invalid contents of the"
7165                                      " 'disks' parameter")
7166       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7167         self.op.hypervisor = self.cfg.GetHypervisorType()
7168     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7169       if not hasattr(self.op, "name"):
7170         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7171       fname = self.cfg.ExpandInstanceName(self.op.name)
7172       if fname is None:
7173         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7174                                    self.op.name)
7175       self.op.name = fname
7176       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7177     else:
7178       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7179                                  self.op.mode)
7180
7181     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7182       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7183         raise errors.OpPrereqError("Missing allocator name")
7184     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7185       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7186                                  self.op.direction)
7187
7188   def Exec(self, feedback_fn):
7189     """Run the allocator test.
7190
7191     """
7192     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7193       ial = IAllocator(self,
7194                        mode=self.op.mode,
7195                        name=self.op.name,
7196                        mem_size=self.op.mem_size,
7197                        disks=self.op.disks,
7198                        disk_template=self.op.disk_template,
7199                        os=self.op.os,
7200                        tags=self.op.tags,
7201                        nics=self.op.nics,
7202                        vcpus=self.op.vcpus,
7203                        hypervisor=self.op.hypervisor,
7204                        )
7205     else:
7206       ial = IAllocator(self,
7207                        mode=self.op.mode,
7208                        name=self.op.name,
7209                        relocate_from=list(self.relocate_from),
7210                        )
7211
7212     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7213       result = ial.in_text
7214     else:
7215       ial.Run(self.op.allocator, validate=False)
7216       result = ial.out_text
7217     return result