Convert export_list rpc to new style result
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     result.Raise()
614     if not result.data:
615       raise errors.OpPrereqError("One or more target bridges %s does not"
616                                  " exist on destination node '%s'" %
617                                  (brlist, target_node))
618
619
620 def _CheckInstanceBridgesExist(lu, instance, node=None):
621   """Check that the brigdes needed by an instance exist.
622
623   """
624   if node is None:
625     node=instance.primary_node
626   _CheckNicsBridgesExist(lu, instance.nics, node)
627
628
629 class LUDestroyCluster(NoHooksLU):
630   """Logical unit for destroying the cluster.
631
632   """
633   _OP_REQP = []
634
635   def CheckPrereq(self):
636     """Check prerequisites.
637
638     This checks whether the cluster is empty.
639
640     Any errors are signalled by raising errors.OpPrereqError.
641
642     """
643     master = self.cfg.GetMasterNode()
644
645     nodelist = self.cfg.GetNodeList()
646     if len(nodelist) != 1 or nodelist[0] != master:
647       raise errors.OpPrereqError("There are still %d node(s) in"
648                                  " this cluster." % (len(nodelist) - 1))
649     instancelist = self.cfg.GetInstanceList()
650     if instancelist:
651       raise errors.OpPrereqError("There are still %d instance(s) in"
652                                  " this cluster." % len(instancelist))
653
654   def Exec(self, feedback_fn):
655     """Destroys the cluster.
656
657     """
658     master = self.cfg.GetMasterNode()
659     result = self.rpc.call_node_stop_master(master, False)
660     result.Raise()
661     if not result.data:
662       raise errors.OpExecError("Could not disable the master role")
663     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
664     utils.CreateBackup(priv_key)
665     utils.CreateBackup(pub_key)
666     return master
667
668
669 class LUVerifyCluster(LogicalUnit):
670   """Verifies the cluster status.
671
672   """
673   HPATH = "cluster-verify"
674   HTYPE = constants.HTYPE_CLUSTER
675   _OP_REQP = ["skip_checks"]
676   REQ_BGL = False
677
678   def ExpandNames(self):
679     self.needed_locks = {
680       locking.LEVEL_NODE: locking.ALL_SET,
681       locking.LEVEL_INSTANCE: locking.ALL_SET,
682     }
683     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
684
685   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
686                   node_result, feedback_fn, master_files,
687                   drbd_map, vg_name):
688     """Run multiple tests against a node.
689
690     Test list:
691
692       - compares ganeti version
693       - checks vg existance and size > 20G
694       - checks config file checksum
695       - checks ssh to other nodes
696
697     @type nodeinfo: L{objects.Node}
698     @param nodeinfo: the node to check
699     @param file_list: required list of files
700     @param local_cksum: dictionary of local files and their checksums
701     @param node_result: the results from the node
702     @param feedback_fn: function used to accumulate results
703     @param master_files: list of files that only masters should have
704     @param drbd_map: the useddrbd minors for this node, in
705         form of minor: (instance, must_exist) which correspond to instances
706         and their running status
707     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
708
709     """
710     node = nodeinfo.name
711
712     # main result, node_result should be a non-empty dict
713     if not node_result or not isinstance(node_result, dict):
714       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
715       return True
716
717     # compares ganeti version
718     local_version = constants.PROTOCOL_VERSION
719     remote_version = node_result.get('version', None)
720     if not (remote_version and isinstance(remote_version, (list, tuple)) and
721             len(remote_version) == 2):
722       feedback_fn("  - ERROR: connection to %s failed" % (node))
723       return True
724
725     if local_version != remote_version[0]:
726       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
727                   " node %s %s" % (local_version, node, remote_version[0]))
728       return True
729
730     # node seems compatible, we can actually try to look into its results
731
732     bad = False
733
734     # full package version
735     if constants.RELEASE_VERSION != remote_version[1]:
736       feedback_fn("  - WARNING: software version mismatch: master %s,"
737                   " node %s %s" %
738                   (constants.RELEASE_VERSION, node, remote_version[1]))
739
740     # checks vg existence and size > 20G
741     if vg_name is not None:
742       vglist = node_result.get(constants.NV_VGLIST, None)
743       if not vglist:
744         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
745                         (node,))
746         bad = True
747       else:
748         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
749                                               constants.MIN_VG_SIZE)
750         if vgstatus:
751           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
752           bad = True
753
754     # checks config file checksum
755
756     remote_cksum = node_result.get(constants.NV_FILELIST, None)
757     if not isinstance(remote_cksum, dict):
758       bad = True
759       feedback_fn("  - ERROR: node hasn't returned file checksum data")
760     else:
761       for file_name in file_list:
762         node_is_mc = nodeinfo.master_candidate
763         must_have_file = file_name not in master_files
764         if file_name not in remote_cksum:
765           if node_is_mc or must_have_file:
766             bad = True
767             feedback_fn("  - ERROR: file '%s' missing" % file_name)
768         elif remote_cksum[file_name] != local_cksum[file_name]:
769           if node_is_mc or must_have_file:
770             bad = True
771             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
772           else:
773             # not candidate and this is not a must-have file
774             bad = True
775             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
776                         " '%s'" % file_name)
777         else:
778           # all good, except non-master/non-must have combination
779           if not node_is_mc and not must_have_file:
780             feedback_fn("  - ERROR: file '%s' should not exist on non master"
781                         " candidates" % file_name)
782
783     # checks ssh to any
784
785     if constants.NV_NODELIST not in node_result:
786       bad = True
787       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
788     else:
789       if node_result[constants.NV_NODELIST]:
790         bad = True
791         for node in node_result[constants.NV_NODELIST]:
792           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
793                           (node, node_result[constants.NV_NODELIST][node]))
794
795     if constants.NV_NODENETTEST not in node_result:
796       bad = True
797       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
798     else:
799       if node_result[constants.NV_NODENETTEST]:
800         bad = True
801         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
802         for node in nlist:
803           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
804                           (node, node_result[constants.NV_NODENETTEST][node]))
805
806     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
807     if isinstance(hyp_result, dict):
808       for hv_name, hv_result in hyp_result.iteritems():
809         if hv_result is not None:
810           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
811                       (hv_name, hv_result))
812
813     # check used drbd list
814     if vg_name is not None:
815       used_minors = node_result.get(constants.NV_DRBDLIST, [])
816       if not isinstance(used_minors, (tuple, list)):
817         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
818                     str(used_minors))
819       else:
820         for minor, (iname, must_exist) in drbd_map.items():
821           if minor not in used_minors and must_exist:
822             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
823                         " not active" % (minor, iname))
824             bad = True
825         for minor in used_minors:
826           if minor not in drbd_map:
827             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
828                         minor)
829             bad = True
830
831     return bad
832
833   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
834                       node_instance, feedback_fn, n_offline):
835     """Verify an instance.
836
837     This function checks to see if the required block devices are
838     available on the instance's node.
839
840     """
841     bad = False
842
843     node_current = instanceconfig.primary_node
844
845     node_vol_should = {}
846     instanceconfig.MapLVsByNode(node_vol_should)
847
848     for node in node_vol_should:
849       if node in n_offline:
850         # ignore missing volumes on offline nodes
851         continue
852       for volume in node_vol_should[node]:
853         if node not in node_vol_is or volume not in node_vol_is[node]:
854           feedback_fn("  - ERROR: volume %s missing on node %s" %
855                           (volume, node))
856           bad = True
857
858     if instanceconfig.admin_up:
859       if ((node_current not in node_instance or
860           not instance in node_instance[node_current]) and
861           node_current not in n_offline):
862         feedback_fn("  - ERROR: instance %s not running on node %s" %
863                         (instance, node_current))
864         bad = True
865
866     for node in node_instance:
867       if (not node == node_current):
868         if instance in node_instance[node]:
869           feedback_fn("  - ERROR: instance %s should not run on node %s" %
870                           (instance, node))
871           bad = True
872
873     return bad
874
875   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
876     """Verify if there are any unknown volumes in the cluster.
877
878     The .os, .swap and backup volumes are ignored. All other volumes are
879     reported as unknown.
880
881     """
882     bad = False
883
884     for node in node_vol_is:
885       for volume in node_vol_is[node]:
886         if node not in node_vol_should or volume not in node_vol_should[node]:
887           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
888                       (volume, node))
889           bad = True
890     return bad
891
892   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
893     """Verify the list of running instances.
894
895     This checks what instances are running but unknown to the cluster.
896
897     """
898     bad = False
899     for node in node_instance:
900       for runninginstance in node_instance[node]:
901         if runninginstance not in instancelist:
902           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
903                           (runninginstance, node))
904           bad = True
905     return bad
906
907   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
908     """Verify N+1 Memory Resilience.
909
910     Check that if one single node dies we can still start all the instances it
911     was primary for.
912
913     """
914     bad = False
915
916     for node, nodeinfo in node_info.iteritems():
917       # This code checks that every node which is now listed as secondary has
918       # enough memory to host all instances it is supposed to should a single
919       # other node in the cluster fail.
920       # FIXME: not ready for failover to an arbitrary node
921       # FIXME: does not support file-backed instances
922       # WARNING: we currently take into account down instances as well as up
923       # ones, considering that even if they're down someone might want to start
924       # them even in the event of a node failure.
925       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
926         needed_mem = 0
927         for instance in instances:
928           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
929           if bep[constants.BE_AUTO_BALANCE]:
930             needed_mem += bep[constants.BE_MEMORY]
931         if nodeinfo['mfree'] < needed_mem:
932           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
933                       " failovers should node %s fail" % (node, prinode))
934           bad = True
935     return bad
936
937   def CheckPrereq(self):
938     """Check prerequisites.
939
940     Transform the list of checks we're going to skip into a set and check that
941     all its members are valid.
942
943     """
944     self.skip_set = frozenset(self.op.skip_checks)
945     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
946       raise errors.OpPrereqError("Invalid checks to be skipped specified")
947
948   def BuildHooksEnv(self):
949     """Build hooks env.
950
951     Cluster-Verify hooks just rone in the post phase and their failure makes
952     the output be logged in the verify output and the verification to fail.
953
954     """
955     all_nodes = self.cfg.GetNodeList()
956     env = {
957       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
958       }
959     for node in self.cfg.GetAllNodesInfo().values():
960       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
961
962     return env, [], all_nodes
963
964   def Exec(self, feedback_fn):
965     """Verify integrity of cluster, performing various test on nodes.
966
967     """
968     bad = False
969     feedback_fn("* Verifying global settings")
970     for msg in self.cfg.VerifyConfig():
971       feedback_fn("  - ERROR: %s" % msg)
972
973     vg_name = self.cfg.GetVGName()
974     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
975     nodelist = utils.NiceSort(self.cfg.GetNodeList())
976     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
977     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
978     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
979                         for iname in instancelist)
980     i_non_redundant = [] # Non redundant instances
981     i_non_a_balanced = [] # Non auto-balanced instances
982     n_offline = [] # List of offline nodes
983     n_drained = [] # List of nodes being drained
984     node_volume = {}
985     node_instance = {}
986     node_info = {}
987     instance_cfg = {}
988
989     # FIXME: verify OS list
990     # do local checksums
991     master_files = [constants.CLUSTER_CONF_FILE]
992
993     file_names = ssconf.SimpleStore().GetFileList()
994     file_names.append(constants.SSL_CERT_FILE)
995     file_names.append(constants.RAPI_CERT_FILE)
996     file_names.extend(master_files)
997
998     local_checksums = utils.FingerprintFiles(file_names)
999
1000     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1001     node_verify_param = {
1002       constants.NV_FILELIST: file_names,
1003       constants.NV_NODELIST: [node.name for node in nodeinfo
1004                               if not node.offline],
1005       constants.NV_HYPERVISOR: hypervisors,
1006       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1007                                   node.secondary_ip) for node in nodeinfo
1008                                  if not node.offline],
1009       constants.NV_INSTANCELIST: hypervisors,
1010       constants.NV_VERSION: None,
1011       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1012       }
1013     if vg_name is not None:
1014       node_verify_param[constants.NV_VGLIST] = None
1015       node_verify_param[constants.NV_LVLIST] = vg_name
1016       node_verify_param[constants.NV_DRBDLIST] = None
1017     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1018                                            self.cfg.GetClusterName())
1019
1020     cluster = self.cfg.GetClusterInfo()
1021     master_node = self.cfg.GetMasterNode()
1022     all_drbd_map = self.cfg.ComputeDRBDMap()
1023
1024     for node_i in nodeinfo:
1025       node = node_i.name
1026       nresult = all_nvinfo[node].data
1027
1028       if node_i.offline:
1029         feedback_fn("* Skipping offline node %s" % (node,))
1030         n_offline.append(node)
1031         continue
1032
1033       if node == master_node:
1034         ntype = "master"
1035       elif node_i.master_candidate:
1036         ntype = "master candidate"
1037       elif node_i.drained:
1038         ntype = "drained"
1039         n_drained.append(node)
1040       else:
1041         ntype = "regular"
1042       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1043
1044       if all_nvinfo[node].failed or not isinstance(nresult, dict):
1045         feedback_fn("  - ERROR: connection to %s failed" % (node,))
1046         bad = True
1047         continue
1048
1049       node_drbd = {}
1050       for minor, instance in all_drbd_map[node].items():
1051         if instance not in instanceinfo:
1052           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053                       instance)
1054           # ghost instance should not be running, but otherwise we
1055           # don't give double warnings (both ghost instance and
1056           # unallocated minor in use)
1057           node_drbd[minor] = (instance, False)
1058         else:
1059           instance = instanceinfo[instance]
1060           node_drbd[minor] = (instance.name, instance.admin_up)
1061       result = self._VerifyNode(node_i, file_names, local_checksums,
1062                                 nresult, feedback_fn, master_files,
1063                                 node_drbd, vg_name)
1064       bad = bad or result
1065
1066       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067       if vg_name is None:
1068         node_volume[node] = {}
1069       elif isinstance(lvdata, basestring):
1070         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071                     (node, utils.SafeEncode(lvdata)))
1072         bad = True
1073         node_volume[node] = {}
1074       elif not isinstance(lvdata, dict):
1075         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076         bad = True
1077         continue
1078       else:
1079         node_volume[node] = lvdata
1080
1081       # node_instance
1082       idata = nresult.get(constants.NV_INSTANCELIST, None)
1083       if not isinstance(idata, list):
1084         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085                     (node,))
1086         bad = True
1087         continue
1088
1089       node_instance[node] = idata
1090
1091       # node_info
1092       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093       if not isinstance(nodeinfo, dict):
1094         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095         bad = True
1096         continue
1097
1098       try:
1099         node_info[node] = {
1100           "mfree": int(nodeinfo['memory_free']),
1101           "pinst": [],
1102           "sinst": [],
1103           # dictionary holding all instances this node is secondary for,
1104           # grouped by their primary node. Each key is a cluster node, and each
1105           # value is a list of instances which have the key as primary and the
1106           # current node as secondary.  this is handy to calculate N+1 memory
1107           # availability if you can only failover from a primary to its
1108           # secondary.
1109           "sinst-by-pnode": {},
1110         }
1111         # FIXME: devise a free space model for file based instances as well
1112         if vg_name is not None:
1113           if (constants.NV_VGLIST not in nresult or
1114               vg_name not in nresult[constants.NV_VGLIST]):
1115             feedback_fn("  - ERROR: node %s didn't return data for the"
1116                         " volume group '%s' - it is either missing or broken" %
1117                         (node, vg_name))
1118             bad = True
1119             continue
1120           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121       except (ValueError, KeyError):
1122         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123                     " from node %s" % (node,))
1124         bad = True
1125         continue
1126
1127     node_vol_should = {}
1128
1129     for instance in instancelist:
1130       feedback_fn("* Verifying instance %s" % instance)
1131       inst_config = instanceinfo[instance]
1132       result =  self._VerifyInstance(instance, inst_config, node_volume,
1133                                      node_instance, feedback_fn, n_offline)
1134       bad = bad or result
1135       inst_nodes_offline = []
1136
1137       inst_config.MapLVsByNode(node_vol_should)
1138
1139       instance_cfg[instance] = inst_config
1140
1141       pnode = inst_config.primary_node
1142       if pnode in node_info:
1143         node_info[pnode]['pinst'].append(instance)
1144       elif pnode not in n_offline:
1145         feedback_fn("  - ERROR: instance %s, connection to primary node"
1146                     " %s failed" % (instance, pnode))
1147         bad = True
1148
1149       if pnode in n_offline:
1150         inst_nodes_offline.append(pnode)
1151
1152       # If the instance is non-redundant we cannot survive losing its primary
1153       # node, so we are not N+1 compliant. On the other hand we have no disk
1154       # templates with more than one secondary so that situation is not well
1155       # supported either.
1156       # FIXME: does not support file-backed instances
1157       if len(inst_config.secondary_nodes) == 0:
1158         i_non_redundant.append(instance)
1159       elif len(inst_config.secondary_nodes) > 1:
1160         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161                     % instance)
1162
1163       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164         i_non_a_balanced.append(instance)
1165
1166       for snode in inst_config.secondary_nodes:
1167         if snode in node_info:
1168           node_info[snode]['sinst'].append(instance)
1169           if pnode not in node_info[snode]['sinst-by-pnode']:
1170             node_info[snode]['sinst-by-pnode'][pnode] = []
1171           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172         elif snode not in n_offline:
1173           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174                       " %s failed" % (instance, snode))
1175           bad = True
1176         if snode in n_offline:
1177           inst_nodes_offline.append(snode)
1178
1179       if inst_nodes_offline:
1180         # warn that the instance lives on offline nodes, and set bad=True
1181         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182                     ", ".join(inst_nodes_offline))
1183         bad = True
1184
1185     feedback_fn("* Verifying orphan volumes")
1186     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187                                        feedback_fn)
1188     bad = bad or result
1189
1190     feedback_fn("* Verifying remaining instances")
1191     result = self._VerifyOrphanInstances(instancelist, node_instance,
1192                                          feedback_fn)
1193     bad = bad or result
1194
1195     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196       feedback_fn("* Verifying N+1 Memory redundancy")
1197       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198       bad = bad or result
1199
1200     feedback_fn("* Other Notes")
1201     if i_non_redundant:
1202       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203                   % len(i_non_redundant))
1204
1205     if i_non_a_balanced:
1206       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207                   % len(i_non_a_balanced))
1208
1209     if n_offline:
1210       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211
1212     if n_drained:
1213       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214
1215     return not bad
1216
1217   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218     """Analize the post-hooks' result
1219
1220     This method analyses the hook result, handles it, and sends some
1221     nicely-formatted feedback back to the user.
1222
1223     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225     @param hooks_results: the results of the multi-node hooks rpc call
1226     @param feedback_fn: function used send feedback back to the caller
1227     @param lu_result: previous Exec result
1228     @return: the new Exec result, based on the previous result
1229         and hook results
1230
1231     """
1232     # We only really run POST phase hooks, and are only interested in
1233     # their results
1234     if phase == constants.HOOKS_PHASE_POST:
1235       # Used to change hooks' output to proper indentation
1236       indent_re = re.compile('^', re.M)
1237       feedback_fn("* Hooks Results")
1238       if not hooks_results:
1239         feedback_fn("  - ERROR: general communication failure")
1240         lu_result = 1
1241       else:
1242         for node_name in hooks_results:
1243           show_node_header = True
1244           res = hooks_results[node_name]
1245           if res.failed or res.data is False or not isinstance(res.data, list):
1246             if res.offline:
1247               # no need to warn or set fail return value
1248               continue
1249             feedback_fn("    Communication failure in hooks execution")
1250             lu_result = 1
1251             continue
1252           for script, hkr, output in res.data:
1253             if hkr == constants.HKR_FAIL:
1254               # The node header is only shown once, if there are
1255               # failing hooks on that node
1256               if show_node_header:
1257                 feedback_fn("  Node %s:" % node_name)
1258                 show_node_header = False
1259               feedback_fn("    ERROR: Script %s failed, output:" % script)
1260               output = indent_re.sub('      ', output)
1261               feedback_fn("%s" % output)
1262               lu_result = 1
1263
1264       return lu_result
1265
1266
1267 class LUVerifyDisks(NoHooksLU):
1268   """Verifies the cluster disks status.
1269
1270   """
1271   _OP_REQP = []
1272   REQ_BGL = False
1273
1274   def ExpandNames(self):
1275     self.needed_locks = {
1276       locking.LEVEL_NODE: locking.ALL_SET,
1277       locking.LEVEL_INSTANCE: locking.ALL_SET,
1278     }
1279     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1280
1281   def CheckPrereq(self):
1282     """Check prerequisites.
1283
1284     This has no prerequisites.
1285
1286     """
1287     pass
1288
1289   def Exec(self, feedback_fn):
1290     """Verify integrity of cluster disks.
1291
1292     """
1293     result = res_nodes, res_nlvm, res_instances, res_missing = [], {}, [], {}
1294
1295     vg_name = self.cfg.GetVGName()
1296     nodes = utils.NiceSort(self.cfg.GetNodeList())
1297     instances = [self.cfg.GetInstanceInfo(name)
1298                  for name in self.cfg.GetInstanceList()]
1299
1300     nv_dict = {}
1301     for inst in instances:
1302       inst_lvs = {}
1303       if (not inst.admin_up or
1304           inst.disk_template not in constants.DTS_NET_MIRROR):
1305         continue
1306       inst.MapLVsByNode(inst_lvs)
1307       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1308       for node, vol_list in inst_lvs.iteritems():
1309         for vol in vol_list:
1310           nv_dict[(node, vol)] = inst
1311
1312     if not nv_dict:
1313       return result
1314
1315     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1316
1317     to_act = set()
1318     for node in nodes:
1319       # node_volume
1320       lvs = node_lvs[node]
1321       if lvs.failed:
1322         if not lvs.offline:
1323           self.LogWarning("Connection to node %s failed: %s" %
1324                           (node, lvs.data))
1325         continue
1326       lvs = lvs.data
1327       if isinstance(lvs, basestring):
1328         logging.warning("Error enumerating LVs on node %s: %s", node, lvs)
1329         res_nlvm[node] = lvs
1330         continue
1331       elif not isinstance(lvs, dict):
1332         logging.warning("Connection to node %s failed or invalid data"
1333                         " returned", node)
1334         res_nodes.append(node)
1335         continue
1336
1337       for lv_name, (_, lv_inactive, lv_online) in lvs.iteritems():
1338         inst = nv_dict.pop((node, lv_name), None)
1339         if (not lv_online and inst is not None
1340             and inst.name not in res_instances):
1341           res_instances.append(inst.name)
1342
1343     # any leftover items in nv_dict are missing LVs, let's arrange the
1344     # data better
1345     for key, inst in nv_dict.iteritems():
1346       if inst.name not in res_missing:
1347         res_missing[inst.name] = []
1348       res_missing[inst.name].append(key)
1349
1350     return result
1351
1352
1353 class LURenameCluster(LogicalUnit):
1354   """Rename the cluster.
1355
1356   """
1357   HPATH = "cluster-rename"
1358   HTYPE = constants.HTYPE_CLUSTER
1359   _OP_REQP = ["name"]
1360
1361   def BuildHooksEnv(self):
1362     """Build hooks env.
1363
1364     """
1365     env = {
1366       "OP_TARGET": self.cfg.GetClusterName(),
1367       "NEW_NAME": self.op.name,
1368       }
1369     mn = self.cfg.GetMasterNode()
1370     return env, [mn], [mn]
1371
1372   def CheckPrereq(self):
1373     """Verify that the passed name is a valid one.
1374
1375     """
1376     hostname = utils.HostInfo(self.op.name)
1377
1378     new_name = hostname.name
1379     self.ip = new_ip = hostname.ip
1380     old_name = self.cfg.GetClusterName()
1381     old_ip = self.cfg.GetMasterIP()
1382     if new_name == old_name and new_ip == old_ip:
1383       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1384                                  " cluster has changed")
1385     if new_ip != old_ip:
1386       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1387         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1388                                    " reachable on the network. Aborting." %
1389                                    new_ip)
1390
1391     self.op.name = new_name
1392
1393   def Exec(self, feedback_fn):
1394     """Rename the cluster.
1395
1396     """
1397     clustername = self.op.name
1398     ip = self.ip
1399
1400     # shutdown the master IP
1401     master = self.cfg.GetMasterNode()
1402     result = self.rpc.call_node_stop_master(master, False)
1403     if result.failed or not result.data:
1404       raise errors.OpExecError("Could not disable the master role")
1405
1406     try:
1407       cluster = self.cfg.GetClusterInfo()
1408       cluster.cluster_name = clustername
1409       cluster.master_ip = ip
1410       self.cfg.Update(cluster)
1411
1412       # update the known hosts file
1413       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1414       node_list = self.cfg.GetNodeList()
1415       try:
1416         node_list.remove(master)
1417       except ValueError:
1418         pass
1419       result = self.rpc.call_upload_file(node_list,
1420                                          constants.SSH_KNOWN_HOSTS_FILE)
1421       for to_node, to_result in result.iteritems():
1422          msg = to_result.RemoteFailMsg()
1423          if msg:
1424            msg = ("Copy of file %s to node %s failed: %s" %
1425                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1426            self.proc.LogWarning(msg)
1427
1428     finally:
1429       result = self.rpc.call_node_start_master(master, False)
1430       if result.failed or not result.data:
1431         self.LogWarning("Could not re-enable the master role on"
1432                         " the master, please restart manually.")
1433
1434
1435 def _RecursiveCheckIfLVMBased(disk):
1436   """Check if the given disk or its children are lvm-based.
1437
1438   @type disk: L{objects.Disk}
1439   @param disk: the disk to check
1440   @rtype: booleean
1441   @return: boolean indicating whether a LD_LV dev_type was found or not
1442
1443   """
1444   if disk.children:
1445     for chdisk in disk.children:
1446       if _RecursiveCheckIfLVMBased(chdisk):
1447         return True
1448   return disk.dev_type == constants.LD_LV
1449
1450
1451 class LUSetClusterParams(LogicalUnit):
1452   """Change the parameters of the cluster.
1453
1454   """
1455   HPATH = "cluster-modify"
1456   HTYPE = constants.HTYPE_CLUSTER
1457   _OP_REQP = []
1458   REQ_BGL = False
1459
1460   def CheckArguments(self):
1461     """Check parameters
1462
1463     """
1464     if not hasattr(self.op, "candidate_pool_size"):
1465       self.op.candidate_pool_size = None
1466     if self.op.candidate_pool_size is not None:
1467       try:
1468         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1469       except (ValueError, TypeError), err:
1470         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1471                                    str(err))
1472       if self.op.candidate_pool_size < 1:
1473         raise errors.OpPrereqError("At least one master candidate needed")
1474
1475   def ExpandNames(self):
1476     # FIXME: in the future maybe other cluster params won't require checking on
1477     # all nodes to be modified.
1478     self.needed_locks = {
1479       locking.LEVEL_NODE: locking.ALL_SET,
1480     }
1481     self.share_locks[locking.LEVEL_NODE] = 1
1482
1483   def BuildHooksEnv(self):
1484     """Build hooks env.
1485
1486     """
1487     env = {
1488       "OP_TARGET": self.cfg.GetClusterName(),
1489       "NEW_VG_NAME": self.op.vg_name,
1490       }
1491     mn = self.cfg.GetMasterNode()
1492     return env, [mn], [mn]
1493
1494   def CheckPrereq(self):
1495     """Check prerequisites.
1496
1497     This checks whether the given params don't conflict and
1498     if the given volume group is valid.
1499
1500     """
1501     if self.op.vg_name is not None and not self.op.vg_name:
1502       instances = self.cfg.GetAllInstancesInfo().values()
1503       for inst in instances:
1504         for disk in inst.disks:
1505           if _RecursiveCheckIfLVMBased(disk):
1506             raise errors.OpPrereqError("Cannot disable lvm storage while"
1507                                        " lvm-based instances exist")
1508
1509     node_list = self.acquired_locks[locking.LEVEL_NODE]
1510
1511     # if vg_name not None, checks given volume group on all nodes
1512     if self.op.vg_name:
1513       vglist = self.rpc.call_vg_list(node_list)
1514       for node in node_list:
1515         if vglist[node].failed:
1516           # ignoring down node
1517           self.LogWarning("Node %s unreachable/error, ignoring" % node)
1518           continue
1519         vgstatus = utils.CheckVolumeGroupSize(vglist[node].data,
1520                                               self.op.vg_name,
1521                                               constants.MIN_VG_SIZE)
1522         if vgstatus:
1523           raise errors.OpPrereqError("Error on node '%s': %s" %
1524                                      (node, vgstatus))
1525
1526     self.cluster = cluster = self.cfg.GetClusterInfo()
1527     # validate params changes
1528     if self.op.beparams:
1529       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1530       self.new_beparams = objects.FillDict(
1531         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1532
1533     if self.op.nicparams:
1534       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1535       self.new_nicparams = objects.FillDict(
1536         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1537       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1538
1539     # hypervisor list/parameters
1540     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1541     if self.op.hvparams:
1542       if not isinstance(self.op.hvparams, dict):
1543         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1544       for hv_name, hv_dict in self.op.hvparams.items():
1545         if hv_name not in self.new_hvparams:
1546           self.new_hvparams[hv_name] = hv_dict
1547         else:
1548           self.new_hvparams[hv_name].update(hv_dict)
1549
1550     if self.op.enabled_hypervisors is not None:
1551       self.hv_list = self.op.enabled_hypervisors
1552     else:
1553       self.hv_list = cluster.enabled_hypervisors
1554
1555     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1556       # either the enabled list has changed, or the parameters have, validate
1557       for hv_name, hv_params in self.new_hvparams.items():
1558         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1559             (self.op.enabled_hypervisors and
1560              hv_name in self.op.enabled_hypervisors)):
1561           # either this is a new hypervisor, or its parameters have changed
1562           hv_class = hypervisor.GetHypervisor(hv_name)
1563           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1564           hv_class.CheckParameterSyntax(hv_params)
1565           _CheckHVParams(self, node_list, hv_name, hv_params)
1566
1567   def Exec(self, feedback_fn):
1568     """Change the parameters of the cluster.
1569
1570     """
1571     if self.op.vg_name is not None:
1572       new_volume = self.op.vg_name
1573       if not new_volume:
1574         new_volume = None
1575       if new_volume != self.cfg.GetVGName():
1576         self.cfg.SetVGName(new_volume)
1577       else:
1578         feedback_fn("Cluster LVM configuration already in desired"
1579                     " state, not changing")
1580     if self.op.hvparams:
1581       self.cluster.hvparams = self.new_hvparams
1582     if self.op.enabled_hypervisors is not None:
1583       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1584     if self.op.beparams:
1585       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1586     if self.op.nicparams:
1587       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1588
1589     if self.op.candidate_pool_size is not None:
1590       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1591
1592     self.cfg.Update(self.cluster)
1593
1594     # we want to update nodes after the cluster so that if any errors
1595     # happen, we have recorded and saved the cluster info
1596     if self.op.candidate_pool_size is not None:
1597       _AdjustCandidatePool(self)
1598
1599
1600 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1601   """Distribute additional files which are part of the cluster configuration.
1602
1603   ConfigWriter takes care of distributing the config and ssconf files, but
1604   there are more files which should be distributed to all nodes. This function
1605   makes sure those are copied.
1606
1607   @param lu: calling logical unit
1608   @param additional_nodes: list of nodes not in the config to distribute to
1609
1610   """
1611   # 1. Gather target nodes
1612   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1613   dist_nodes = lu.cfg.GetNodeList()
1614   if additional_nodes is not None:
1615     dist_nodes.extend(additional_nodes)
1616   if myself.name in dist_nodes:
1617     dist_nodes.remove(myself.name)
1618   # 2. Gather files to distribute
1619   dist_files = set([constants.ETC_HOSTS,
1620                     constants.SSH_KNOWN_HOSTS_FILE,
1621                     constants.RAPI_CERT_FILE,
1622                     constants.RAPI_USERS_FILE,
1623                    ])
1624
1625   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1626   for hv_name in enabled_hypervisors:
1627     hv_class = hypervisor.GetHypervisor(hv_name)
1628     dist_files.update(hv_class.GetAncillaryFiles())
1629
1630   # 3. Perform the files upload
1631   for fname in dist_files:
1632     if os.path.exists(fname):
1633       result = lu.rpc.call_upload_file(dist_nodes, fname)
1634       for to_node, to_result in result.items():
1635          msg = to_result.RemoteFailMsg()
1636          if msg:
1637            msg = ("Copy of file %s to node %s failed: %s" %
1638                    (fname, to_node, msg))
1639            lu.proc.LogWarning(msg)
1640
1641
1642 class LURedistributeConfig(NoHooksLU):
1643   """Force the redistribution of cluster configuration.
1644
1645   This is a very simple LU.
1646
1647   """
1648   _OP_REQP = []
1649   REQ_BGL = False
1650
1651   def ExpandNames(self):
1652     self.needed_locks = {
1653       locking.LEVEL_NODE: locking.ALL_SET,
1654     }
1655     self.share_locks[locking.LEVEL_NODE] = 1
1656
1657   def CheckPrereq(self):
1658     """Check prerequisites.
1659
1660     """
1661
1662   def Exec(self, feedback_fn):
1663     """Redistribute the configuration.
1664
1665     """
1666     self.cfg.Update(self.cfg.GetClusterInfo())
1667     _RedistributeAncillaryFiles(self)
1668
1669
1670 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1671   """Sleep and poll for an instance's disk to sync.
1672
1673   """
1674   if not instance.disks:
1675     return True
1676
1677   if not oneshot:
1678     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1679
1680   node = instance.primary_node
1681
1682   for dev in instance.disks:
1683     lu.cfg.SetDiskID(dev, node)
1684
1685   retries = 0
1686   while True:
1687     max_time = 0
1688     done = True
1689     cumul_degraded = False
1690     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1691     msg = rstats.RemoteFailMsg()
1692     if msg:
1693       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1694       retries += 1
1695       if retries >= 10:
1696         raise errors.RemoteError("Can't contact node %s for mirror data,"
1697                                  " aborting." % node)
1698       time.sleep(6)
1699       continue
1700     rstats = rstats.payload
1701     retries = 0
1702     for i, mstat in enumerate(rstats):
1703       if mstat is None:
1704         lu.LogWarning("Can't compute data for node %s/%s",
1705                            node, instance.disks[i].iv_name)
1706         continue
1707       # we ignore the ldisk parameter
1708       perc_done, est_time, is_degraded, _ = mstat
1709       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1710       if perc_done is not None:
1711         done = False
1712         if est_time is not None:
1713           rem_time = "%d estimated seconds remaining" % est_time
1714           max_time = est_time
1715         else:
1716           rem_time = "no time estimate"
1717         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1718                         (instance.disks[i].iv_name, perc_done, rem_time))
1719     if done or oneshot:
1720       break
1721
1722     time.sleep(min(60, max_time))
1723
1724   if done:
1725     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1726   return not cumul_degraded
1727
1728
1729 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1730   """Check that mirrors are not degraded.
1731
1732   The ldisk parameter, if True, will change the test from the
1733   is_degraded attribute (which represents overall non-ok status for
1734   the device(s)) to the ldisk (representing the local storage status).
1735
1736   """
1737   lu.cfg.SetDiskID(dev, node)
1738   if ldisk:
1739     idx = 6
1740   else:
1741     idx = 5
1742
1743   result = True
1744   if on_primary or dev.AssembleOnSecondary():
1745     rstats = lu.rpc.call_blockdev_find(node, dev)
1746     msg = rstats.RemoteFailMsg()
1747     if msg:
1748       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1749       result = False
1750     elif not rstats.payload:
1751       lu.LogWarning("Can't find disk on node %s", node)
1752       result = False
1753     else:
1754       result = result and (not rstats.payload[idx])
1755   if dev.children:
1756     for child in dev.children:
1757       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1758
1759   return result
1760
1761
1762 class LUDiagnoseOS(NoHooksLU):
1763   """Logical unit for OS diagnose/query.
1764
1765   """
1766   _OP_REQP = ["output_fields", "names"]
1767   REQ_BGL = False
1768   _FIELDS_STATIC = utils.FieldSet()
1769   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1770
1771   def ExpandNames(self):
1772     if self.op.names:
1773       raise errors.OpPrereqError("Selective OS query not supported")
1774
1775     _CheckOutputFields(static=self._FIELDS_STATIC,
1776                        dynamic=self._FIELDS_DYNAMIC,
1777                        selected=self.op.output_fields)
1778
1779     # Lock all nodes, in shared mode
1780     # Temporary removal of locks, should be reverted later
1781     # TODO: reintroduce locks when they are lighter-weight
1782     self.needed_locks = {}
1783     #self.share_locks[locking.LEVEL_NODE] = 1
1784     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1785
1786   def CheckPrereq(self):
1787     """Check prerequisites.
1788
1789     """
1790
1791   @staticmethod
1792   def _DiagnoseByOS(node_list, rlist):
1793     """Remaps a per-node return list into an a per-os per-node dictionary
1794
1795     @param node_list: a list with the names of all nodes
1796     @param rlist: a map with node names as keys and OS objects as values
1797
1798     @rtype: dict
1799     @return: a dictionary with osnames as keys and as value another map, with
1800         nodes as keys and list of OS objects as values, eg::
1801
1802           {"debian-etch": {"node1": [<object>,...],
1803                            "node2": [<object>,]}
1804           }
1805
1806     """
1807     all_os = {}
1808     # we build here the list of nodes that didn't fail the RPC (at RPC
1809     # level), so that nodes with a non-responding node daemon don't
1810     # make all OSes invalid
1811     good_nodes = [node_name for node_name in rlist
1812                   if not rlist[node_name].failed]
1813     for node_name, nr in rlist.iteritems():
1814       if nr.failed or not nr.data:
1815         continue
1816       for os_obj in nr.data:
1817         if os_obj.name not in all_os:
1818           # build a list of nodes for this os containing empty lists
1819           # for each node in node_list
1820           all_os[os_obj.name] = {}
1821           for nname in good_nodes:
1822             all_os[os_obj.name][nname] = []
1823         all_os[os_obj.name][node_name].append(os_obj)
1824     return all_os
1825
1826   def Exec(self, feedback_fn):
1827     """Compute the list of OSes.
1828
1829     """
1830     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1831     node_data = self.rpc.call_os_diagnose(valid_nodes)
1832     if node_data == False:
1833       raise errors.OpExecError("Can't gather the list of OSes")
1834     pol = self._DiagnoseByOS(valid_nodes, node_data)
1835     output = []
1836     for os_name, os_data in pol.iteritems():
1837       row = []
1838       for field in self.op.output_fields:
1839         if field == "name":
1840           val = os_name
1841         elif field == "valid":
1842           val = utils.all([osl and osl[0] for osl in os_data.values()])
1843         elif field == "node_status":
1844           val = {}
1845           for node_name, nos_list in os_data.iteritems():
1846             val[node_name] = [(v.status, v.path) for v in nos_list]
1847         else:
1848           raise errors.ParameterError(field)
1849         row.append(val)
1850       output.append(row)
1851
1852     return output
1853
1854
1855 class LURemoveNode(LogicalUnit):
1856   """Logical unit for removing a node.
1857
1858   """
1859   HPATH = "node-remove"
1860   HTYPE = constants.HTYPE_NODE
1861   _OP_REQP = ["node_name"]
1862
1863   def BuildHooksEnv(self):
1864     """Build hooks env.
1865
1866     This doesn't run on the target node in the pre phase as a failed
1867     node would then be impossible to remove.
1868
1869     """
1870     env = {
1871       "OP_TARGET": self.op.node_name,
1872       "NODE_NAME": self.op.node_name,
1873       }
1874     all_nodes = self.cfg.GetNodeList()
1875     all_nodes.remove(self.op.node_name)
1876     return env, all_nodes, all_nodes
1877
1878   def CheckPrereq(self):
1879     """Check prerequisites.
1880
1881     This checks:
1882      - the node exists in the configuration
1883      - it does not have primary or secondary instances
1884      - it's not the master
1885
1886     Any errors are signalled by raising errors.OpPrereqError.
1887
1888     """
1889     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1890     if node is None:
1891       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1892
1893     instance_list = self.cfg.GetInstanceList()
1894
1895     masternode = self.cfg.GetMasterNode()
1896     if node.name == masternode:
1897       raise errors.OpPrereqError("Node is the master node,"
1898                                  " you need to failover first.")
1899
1900     for instance_name in instance_list:
1901       instance = self.cfg.GetInstanceInfo(instance_name)
1902       if node.name in instance.all_nodes:
1903         raise errors.OpPrereqError("Instance %s is still running on the node,"
1904                                    " please remove first." % instance_name)
1905     self.op.node_name = node.name
1906     self.node = node
1907
1908   def Exec(self, feedback_fn):
1909     """Removes the node from the cluster.
1910
1911     """
1912     node = self.node
1913     logging.info("Stopping the node daemon and removing configs from node %s",
1914                  node.name)
1915
1916     self.context.RemoveNode(node.name)
1917
1918     self.rpc.call_node_leave_cluster(node.name)
1919
1920     # Promote nodes to master candidate as needed
1921     _AdjustCandidatePool(self)
1922
1923
1924 class LUQueryNodes(NoHooksLU):
1925   """Logical unit for querying nodes.
1926
1927   """
1928   _OP_REQP = ["output_fields", "names", "use_locking"]
1929   REQ_BGL = False
1930   _FIELDS_DYNAMIC = utils.FieldSet(
1931     "dtotal", "dfree",
1932     "mtotal", "mnode", "mfree",
1933     "bootid",
1934     "ctotal", "cnodes", "csockets",
1935     )
1936
1937   _FIELDS_STATIC = utils.FieldSet(
1938     "name", "pinst_cnt", "sinst_cnt",
1939     "pinst_list", "sinst_list",
1940     "pip", "sip", "tags",
1941     "serial_no",
1942     "master_candidate",
1943     "master",
1944     "offline",
1945     "drained",
1946     )
1947
1948   def ExpandNames(self):
1949     _CheckOutputFields(static=self._FIELDS_STATIC,
1950                        dynamic=self._FIELDS_DYNAMIC,
1951                        selected=self.op.output_fields)
1952
1953     self.needed_locks = {}
1954     self.share_locks[locking.LEVEL_NODE] = 1
1955
1956     if self.op.names:
1957       self.wanted = _GetWantedNodes(self, self.op.names)
1958     else:
1959       self.wanted = locking.ALL_SET
1960
1961     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1962     self.do_locking = self.do_node_query and self.op.use_locking
1963     if self.do_locking:
1964       # if we don't request only static fields, we need to lock the nodes
1965       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1966
1967
1968   def CheckPrereq(self):
1969     """Check prerequisites.
1970
1971     """
1972     # The validation of the node list is done in the _GetWantedNodes,
1973     # if non empty, and if empty, there's no validation to do
1974     pass
1975
1976   def Exec(self, feedback_fn):
1977     """Computes the list of nodes and their attributes.
1978
1979     """
1980     all_info = self.cfg.GetAllNodesInfo()
1981     if self.do_locking:
1982       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1983     elif self.wanted != locking.ALL_SET:
1984       nodenames = self.wanted
1985       missing = set(nodenames).difference(all_info.keys())
1986       if missing:
1987         raise errors.OpExecError(
1988           "Some nodes were removed before retrieving their data: %s" % missing)
1989     else:
1990       nodenames = all_info.keys()
1991
1992     nodenames = utils.NiceSort(nodenames)
1993     nodelist = [all_info[name] for name in nodenames]
1994
1995     # begin data gathering
1996
1997     if self.do_node_query:
1998       live_data = {}
1999       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2000                                           self.cfg.GetHypervisorType())
2001       for name in nodenames:
2002         nodeinfo = node_data[name]
2003         if not nodeinfo.failed and nodeinfo.data:
2004           nodeinfo = nodeinfo.data
2005           fn = utils.TryConvert
2006           live_data[name] = {
2007             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2008             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2009             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2010             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2011             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2012             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2013             "bootid": nodeinfo.get('bootid', None),
2014             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2015             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2016             }
2017         else:
2018           live_data[name] = {}
2019     else:
2020       live_data = dict.fromkeys(nodenames, {})
2021
2022     node_to_primary = dict([(name, set()) for name in nodenames])
2023     node_to_secondary = dict([(name, set()) for name in nodenames])
2024
2025     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2026                              "sinst_cnt", "sinst_list"))
2027     if inst_fields & frozenset(self.op.output_fields):
2028       instancelist = self.cfg.GetInstanceList()
2029
2030       for instance_name in instancelist:
2031         inst = self.cfg.GetInstanceInfo(instance_name)
2032         if inst.primary_node in node_to_primary:
2033           node_to_primary[inst.primary_node].add(inst.name)
2034         for secnode in inst.secondary_nodes:
2035           if secnode in node_to_secondary:
2036             node_to_secondary[secnode].add(inst.name)
2037
2038     master_node = self.cfg.GetMasterNode()
2039
2040     # end data gathering
2041
2042     output = []
2043     for node in nodelist:
2044       node_output = []
2045       for field in self.op.output_fields:
2046         if field == "name":
2047           val = node.name
2048         elif field == "pinst_list":
2049           val = list(node_to_primary[node.name])
2050         elif field == "sinst_list":
2051           val = list(node_to_secondary[node.name])
2052         elif field == "pinst_cnt":
2053           val = len(node_to_primary[node.name])
2054         elif field == "sinst_cnt":
2055           val = len(node_to_secondary[node.name])
2056         elif field == "pip":
2057           val = node.primary_ip
2058         elif field == "sip":
2059           val = node.secondary_ip
2060         elif field == "tags":
2061           val = list(node.GetTags())
2062         elif field == "serial_no":
2063           val = node.serial_no
2064         elif field == "master_candidate":
2065           val = node.master_candidate
2066         elif field == "master":
2067           val = node.name == master_node
2068         elif field == "offline":
2069           val = node.offline
2070         elif field == "drained":
2071           val = node.drained
2072         elif self._FIELDS_DYNAMIC.Matches(field):
2073           val = live_data[node.name].get(field, None)
2074         else:
2075           raise errors.ParameterError(field)
2076         node_output.append(val)
2077       output.append(node_output)
2078
2079     return output
2080
2081
2082 class LUQueryNodeVolumes(NoHooksLU):
2083   """Logical unit for getting volumes on node(s).
2084
2085   """
2086   _OP_REQP = ["nodes", "output_fields"]
2087   REQ_BGL = False
2088   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2089   _FIELDS_STATIC = utils.FieldSet("node")
2090
2091   def ExpandNames(self):
2092     _CheckOutputFields(static=self._FIELDS_STATIC,
2093                        dynamic=self._FIELDS_DYNAMIC,
2094                        selected=self.op.output_fields)
2095
2096     self.needed_locks = {}
2097     self.share_locks[locking.LEVEL_NODE] = 1
2098     if not self.op.nodes:
2099       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2100     else:
2101       self.needed_locks[locking.LEVEL_NODE] = \
2102         _GetWantedNodes(self, self.op.nodes)
2103
2104   def CheckPrereq(self):
2105     """Check prerequisites.
2106
2107     This checks that the fields required are valid output fields.
2108
2109     """
2110     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2111
2112   def Exec(self, feedback_fn):
2113     """Computes the list of nodes and their attributes.
2114
2115     """
2116     nodenames = self.nodes
2117     volumes = self.rpc.call_node_volumes(nodenames)
2118
2119     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2120              in self.cfg.GetInstanceList()]
2121
2122     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2123
2124     output = []
2125     for node in nodenames:
2126       if node not in volumes or volumes[node].failed or not volumes[node].data:
2127         continue
2128
2129       node_vols = volumes[node].data[:]
2130       node_vols.sort(key=lambda vol: vol['dev'])
2131
2132       for vol in node_vols:
2133         node_output = []
2134         for field in self.op.output_fields:
2135           if field == "node":
2136             val = node
2137           elif field == "phys":
2138             val = vol['dev']
2139           elif field == "vg":
2140             val = vol['vg']
2141           elif field == "name":
2142             val = vol['name']
2143           elif field == "size":
2144             val = int(float(vol['size']))
2145           elif field == "instance":
2146             for inst in ilist:
2147               if node not in lv_by_node[inst]:
2148                 continue
2149               if vol['name'] in lv_by_node[inst][node]:
2150                 val = inst.name
2151                 break
2152             else:
2153               val = '-'
2154           else:
2155             raise errors.ParameterError(field)
2156           node_output.append(str(val))
2157
2158         output.append(node_output)
2159
2160     return output
2161
2162
2163 class LUAddNode(LogicalUnit):
2164   """Logical unit for adding node to the cluster.
2165
2166   """
2167   HPATH = "node-add"
2168   HTYPE = constants.HTYPE_NODE
2169   _OP_REQP = ["node_name"]
2170
2171   def BuildHooksEnv(self):
2172     """Build hooks env.
2173
2174     This will run on all nodes before, and on all nodes + the new node after.
2175
2176     """
2177     env = {
2178       "OP_TARGET": self.op.node_name,
2179       "NODE_NAME": self.op.node_name,
2180       "NODE_PIP": self.op.primary_ip,
2181       "NODE_SIP": self.op.secondary_ip,
2182       }
2183     nodes_0 = self.cfg.GetNodeList()
2184     nodes_1 = nodes_0 + [self.op.node_name, ]
2185     return env, nodes_0, nodes_1
2186
2187   def CheckPrereq(self):
2188     """Check prerequisites.
2189
2190     This checks:
2191      - the new node is not already in the config
2192      - it is resolvable
2193      - its parameters (single/dual homed) matches the cluster
2194
2195     Any errors are signalled by raising errors.OpPrereqError.
2196
2197     """
2198     node_name = self.op.node_name
2199     cfg = self.cfg
2200
2201     dns_data = utils.HostInfo(node_name)
2202
2203     node = dns_data.name
2204     primary_ip = self.op.primary_ip = dns_data.ip
2205     secondary_ip = getattr(self.op, "secondary_ip", None)
2206     if secondary_ip is None:
2207       secondary_ip = primary_ip
2208     if not utils.IsValidIP(secondary_ip):
2209       raise errors.OpPrereqError("Invalid secondary IP given")
2210     self.op.secondary_ip = secondary_ip
2211
2212     node_list = cfg.GetNodeList()
2213     if not self.op.readd and node in node_list:
2214       raise errors.OpPrereqError("Node %s is already in the configuration" %
2215                                  node)
2216     elif self.op.readd and node not in node_list:
2217       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2218
2219     for existing_node_name in node_list:
2220       existing_node = cfg.GetNodeInfo(existing_node_name)
2221
2222       if self.op.readd and node == existing_node_name:
2223         if (existing_node.primary_ip != primary_ip or
2224             existing_node.secondary_ip != secondary_ip):
2225           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2226                                      " address configuration as before")
2227         continue
2228
2229       if (existing_node.primary_ip == primary_ip or
2230           existing_node.secondary_ip == primary_ip or
2231           existing_node.primary_ip == secondary_ip or
2232           existing_node.secondary_ip == secondary_ip):
2233         raise errors.OpPrereqError("New node ip address(es) conflict with"
2234                                    " existing node %s" % existing_node.name)
2235
2236     # check that the type of the node (single versus dual homed) is the
2237     # same as for the master
2238     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2239     master_singlehomed = myself.secondary_ip == myself.primary_ip
2240     newbie_singlehomed = secondary_ip == primary_ip
2241     if master_singlehomed != newbie_singlehomed:
2242       if master_singlehomed:
2243         raise errors.OpPrereqError("The master has no private ip but the"
2244                                    " new node has one")
2245       else:
2246         raise errors.OpPrereqError("The master has a private ip but the"
2247                                    " new node doesn't have one")
2248
2249     # checks reachablity
2250     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2251       raise errors.OpPrereqError("Node not reachable by ping")
2252
2253     if not newbie_singlehomed:
2254       # check reachability from my secondary ip to newbie's secondary ip
2255       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2256                            source=myself.secondary_ip):
2257         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2258                                    " based ping to noded port")
2259
2260     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2261     mc_now, _ = self.cfg.GetMasterCandidateStats()
2262     master_candidate = mc_now < cp_size
2263
2264     self.new_node = objects.Node(name=node,
2265                                  primary_ip=primary_ip,
2266                                  secondary_ip=secondary_ip,
2267                                  master_candidate=master_candidate,
2268                                  offline=False, drained=False)
2269
2270   def Exec(self, feedback_fn):
2271     """Adds the new node to the cluster.
2272
2273     """
2274     new_node = self.new_node
2275     node = new_node.name
2276
2277     # check connectivity
2278     result = self.rpc.call_version([node])[node]
2279     result.Raise()
2280     if result.data:
2281       if constants.PROTOCOL_VERSION == result.data:
2282         logging.info("Communication to node %s fine, sw version %s match",
2283                      node, result.data)
2284       else:
2285         raise errors.OpExecError("Version mismatch master version %s,"
2286                                  " node version %s" %
2287                                  (constants.PROTOCOL_VERSION, result.data))
2288     else:
2289       raise errors.OpExecError("Cannot get version from the new node")
2290
2291     # setup ssh on node
2292     logging.info("Copy ssh key to node %s", node)
2293     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2294     keyarray = []
2295     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2296                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2297                 priv_key, pub_key]
2298
2299     for i in keyfiles:
2300       f = open(i, 'r')
2301       try:
2302         keyarray.append(f.read())
2303       finally:
2304         f.close()
2305
2306     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2307                                     keyarray[2],
2308                                     keyarray[3], keyarray[4], keyarray[5])
2309
2310     msg = result.RemoteFailMsg()
2311     if msg:
2312       raise errors.OpExecError("Cannot transfer ssh keys to the"
2313                                " new node: %s" % msg)
2314
2315     # Add node to our /etc/hosts, and add key to known_hosts
2316     if self.cfg.GetClusterInfo().modify_etc_hosts:
2317       utils.AddHostToEtcHosts(new_node.name)
2318
2319     if new_node.secondary_ip != new_node.primary_ip:
2320       result = self.rpc.call_node_has_ip_address(new_node.name,
2321                                                  new_node.secondary_ip)
2322       if result.failed or not result.data:
2323         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2324                                  " you gave (%s). Please fix and re-run this"
2325                                  " command." % new_node.secondary_ip)
2326
2327     node_verify_list = [self.cfg.GetMasterNode()]
2328     node_verify_param = {
2329       'nodelist': [node],
2330       # TODO: do a node-net-test as well?
2331     }
2332
2333     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2334                                        self.cfg.GetClusterName())
2335     for verifier in node_verify_list:
2336       if result[verifier].failed or not result[verifier].data:
2337         raise errors.OpExecError("Cannot communicate with %s's node daemon"
2338                                  " for remote verification" % verifier)
2339       if result[verifier].data['nodelist']:
2340         for failed in result[verifier].data['nodelist']:
2341           feedback_fn("ssh/hostname verification failed %s -> %s" %
2342                       (verifier, result[verifier].data['nodelist'][failed]))
2343         raise errors.OpExecError("ssh/hostname verification failed.")
2344
2345     if self.op.readd:
2346       _RedistributeAncillaryFiles(self)
2347       self.context.ReaddNode(new_node)
2348     else:
2349       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2350       self.context.AddNode(new_node)
2351
2352
2353 class LUSetNodeParams(LogicalUnit):
2354   """Modifies the parameters of a node.
2355
2356   """
2357   HPATH = "node-modify"
2358   HTYPE = constants.HTYPE_NODE
2359   _OP_REQP = ["node_name"]
2360   REQ_BGL = False
2361
2362   def CheckArguments(self):
2363     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2364     if node_name is None:
2365       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2366     self.op.node_name = node_name
2367     _CheckBooleanOpField(self.op, 'master_candidate')
2368     _CheckBooleanOpField(self.op, 'offline')
2369     _CheckBooleanOpField(self.op, 'drained')
2370     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2371     if all_mods.count(None) == 3:
2372       raise errors.OpPrereqError("Please pass at least one modification")
2373     if all_mods.count(True) > 1:
2374       raise errors.OpPrereqError("Can't set the node into more than one"
2375                                  " state at the same time")
2376
2377   def ExpandNames(self):
2378     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2379
2380   def BuildHooksEnv(self):
2381     """Build hooks env.
2382
2383     This runs on the master node.
2384
2385     """
2386     env = {
2387       "OP_TARGET": self.op.node_name,
2388       "MASTER_CANDIDATE": str(self.op.master_candidate),
2389       "OFFLINE": str(self.op.offline),
2390       "DRAINED": str(self.op.drained),
2391       }
2392     nl = [self.cfg.GetMasterNode(),
2393           self.op.node_name]
2394     return env, nl, nl
2395
2396   def CheckPrereq(self):
2397     """Check prerequisites.
2398
2399     This only checks the instance list against the existing names.
2400
2401     """
2402     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2403
2404     if ((self.op.master_candidate == False or self.op.offline == True or
2405          self.op.drained == True) and node.master_candidate):
2406       # we will demote the node from master_candidate
2407       if self.op.node_name == self.cfg.GetMasterNode():
2408         raise errors.OpPrereqError("The master node has to be a"
2409                                    " master candidate, online and not drained")
2410       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2411       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2412       if num_candidates <= cp_size:
2413         msg = ("Not enough master candidates (desired"
2414                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2415         if self.op.force:
2416           self.LogWarning(msg)
2417         else:
2418           raise errors.OpPrereqError(msg)
2419
2420     if (self.op.master_candidate == True and
2421         ((node.offline and not self.op.offline == False) or
2422          (node.drained and not self.op.drained == False))):
2423       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2424                                  " to master_candidate" % node.name)
2425
2426     return
2427
2428   def Exec(self, feedback_fn):
2429     """Modifies a node.
2430
2431     """
2432     node = self.node
2433
2434     result = []
2435     changed_mc = False
2436
2437     if self.op.offline is not None:
2438       node.offline = self.op.offline
2439       result.append(("offline", str(self.op.offline)))
2440       if self.op.offline == True:
2441         if node.master_candidate:
2442           node.master_candidate = False
2443           changed_mc = True
2444           result.append(("master_candidate", "auto-demotion due to offline"))
2445         if node.drained:
2446           node.drained = False
2447           result.append(("drained", "clear drained status due to offline"))
2448
2449     if self.op.master_candidate is not None:
2450       node.master_candidate = self.op.master_candidate
2451       changed_mc = True
2452       result.append(("master_candidate", str(self.op.master_candidate)))
2453       if self.op.master_candidate == False:
2454         rrc = self.rpc.call_node_demote_from_mc(node.name)
2455         msg = rrc.RemoteFailMsg()
2456         if msg:
2457           self.LogWarning("Node failed to demote itself: %s" % msg)
2458
2459     if self.op.drained is not None:
2460       node.drained = self.op.drained
2461       result.append(("drained", str(self.op.drained)))
2462       if self.op.drained == True:
2463         if node.master_candidate:
2464           node.master_candidate = False
2465           changed_mc = True
2466           result.append(("master_candidate", "auto-demotion due to drain"))
2467         if node.offline:
2468           node.offline = False
2469           result.append(("offline", "clear offline status due to drain"))
2470
2471     # this will trigger configuration file update, if needed
2472     self.cfg.Update(node)
2473     # this will trigger job queue propagation or cleanup
2474     if changed_mc:
2475       self.context.ReaddNode(node)
2476
2477     return result
2478
2479
2480 class LUPowercycleNode(NoHooksLU):
2481   """Powercycles a node.
2482
2483   """
2484   _OP_REQP = ["node_name", "force"]
2485   REQ_BGL = False
2486
2487   def CheckArguments(self):
2488     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2489     if node_name is None:
2490       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2491     self.op.node_name = node_name
2492     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2493       raise errors.OpPrereqError("The node is the master and the force"
2494                                  " parameter was not set")
2495
2496   def ExpandNames(self):
2497     """Locking for PowercycleNode.
2498
2499     This is a last-resource option and shouldn't block on other
2500     jobs. Therefore, we grab no locks.
2501
2502     """
2503     self.needed_locks = {}
2504
2505   def CheckPrereq(self):
2506     """Check prerequisites.
2507
2508     This LU has no prereqs.
2509
2510     """
2511     pass
2512
2513   def Exec(self, feedback_fn):
2514     """Reboots a node.
2515
2516     """
2517     result = self.rpc.call_node_powercycle(self.op.node_name,
2518                                            self.cfg.GetHypervisorType())
2519     msg = result.RemoteFailMsg()
2520     if msg:
2521       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2522     return result.payload
2523
2524
2525 class LUQueryClusterInfo(NoHooksLU):
2526   """Query cluster configuration.
2527
2528   """
2529   _OP_REQP = []
2530   REQ_BGL = False
2531
2532   def ExpandNames(self):
2533     self.needed_locks = {}
2534
2535   def CheckPrereq(self):
2536     """No prerequsites needed for this LU.
2537
2538     """
2539     pass
2540
2541   def Exec(self, feedback_fn):
2542     """Return cluster config.
2543
2544     """
2545     cluster = self.cfg.GetClusterInfo()
2546     result = {
2547       "software_version": constants.RELEASE_VERSION,
2548       "protocol_version": constants.PROTOCOL_VERSION,
2549       "config_version": constants.CONFIG_VERSION,
2550       "os_api_version": constants.OS_API_VERSION,
2551       "export_version": constants.EXPORT_VERSION,
2552       "architecture": (platform.architecture()[0], platform.machine()),
2553       "name": cluster.cluster_name,
2554       "master": cluster.master_node,
2555       "default_hypervisor": cluster.default_hypervisor,
2556       "enabled_hypervisors": cluster.enabled_hypervisors,
2557       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2558                         for hypervisor in cluster.enabled_hypervisors]),
2559       "beparams": cluster.beparams,
2560       "nicparams": cluster.nicparams,
2561       "candidate_pool_size": cluster.candidate_pool_size,
2562       "master_netdev": cluster.master_netdev,
2563       "volume_group_name": cluster.volume_group_name,
2564       "file_storage_dir": cluster.file_storage_dir,
2565       }
2566
2567     return result
2568
2569
2570 class LUQueryConfigValues(NoHooksLU):
2571   """Return configuration values.
2572
2573   """
2574   _OP_REQP = []
2575   REQ_BGL = False
2576   _FIELDS_DYNAMIC = utils.FieldSet()
2577   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2578
2579   def ExpandNames(self):
2580     self.needed_locks = {}
2581
2582     _CheckOutputFields(static=self._FIELDS_STATIC,
2583                        dynamic=self._FIELDS_DYNAMIC,
2584                        selected=self.op.output_fields)
2585
2586   def CheckPrereq(self):
2587     """No prerequisites.
2588
2589     """
2590     pass
2591
2592   def Exec(self, feedback_fn):
2593     """Dump a representation of the cluster config to the standard output.
2594
2595     """
2596     values = []
2597     for field in self.op.output_fields:
2598       if field == "cluster_name":
2599         entry = self.cfg.GetClusterName()
2600       elif field == "master_node":
2601         entry = self.cfg.GetMasterNode()
2602       elif field == "drain_flag":
2603         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2604       else:
2605         raise errors.ParameterError(field)
2606       values.append(entry)
2607     return values
2608
2609
2610 class LUActivateInstanceDisks(NoHooksLU):
2611   """Bring up an instance's disks.
2612
2613   """
2614   _OP_REQP = ["instance_name"]
2615   REQ_BGL = False
2616
2617   def ExpandNames(self):
2618     self._ExpandAndLockInstance()
2619     self.needed_locks[locking.LEVEL_NODE] = []
2620     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2621
2622   def DeclareLocks(self, level):
2623     if level == locking.LEVEL_NODE:
2624       self._LockInstancesNodes()
2625
2626   def CheckPrereq(self):
2627     """Check prerequisites.
2628
2629     This checks that the instance is in the cluster.
2630
2631     """
2632     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2633     assert self.instance is not None, \
2634       "Cannot retrieve locked instance %s" % self.op.instance_name
2635     _CheckNodeOnline(self, self.instance.primary_node)
2636
2637   def Exec(self, feedback_fn):
2638     """Activate the disks.
2639
2640     """
2641     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2642     if not disks_ok:
2643       raise errors.OpExecError("Cannot activate block devices")
2644
2645     return disks_info
2646
2647
2648 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2649   """Prepare the block devices for an instance.
2650
2651   This sets up the block devices on all nodes.
2652
2653   @type lu: L{LogicalUnit}
2654   @param lu: the logical unit on whose behalf we execute
2655   @type instance: L{objects.Instance}
2656   @param instance: the instance for whose disks we assemble
2657   @type ignore_secondaries: boolean
2658   @param ignore_secondaries: if true, errors on secondary nodes
2659       won't result in an error return from the function
2660   @return: False if the operation failed, otherwise a list of
2661       (host, instance_visible_name, node_visible_name)
2662       with the mapping from node devices to instance devices
2663
2664   """
2665   device_info = []
2666   disks_ok = True
2667   iname = instance.name
2668   # With the two passes mechanism we try to reduce the window of
2669   # opportunity for the race condition of switching DRBD to primary
2670   # before handshaking occured, but we do not eliminate it
2671
2672   # The proper fix would be to wait (with some limits) until the
2673   # connection has been made and drbd transitions from WFConnection
2674   # into any other network-connected state (Connected, SyncTarget,
2675   # SyncSource, etc.)
2676
2677   # 1st pass, assemble on all nodes in secondary mode
2678   for inst_disk in instance.disks:
2679     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2680       lu.cfg.SetDiskID(node_disk, node)
2681       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2682       msg = result.RemoteFailMsg()
2683       if msg:
2684         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2685                            " (is_primary=False, pass=1): %s",
2686                            inst_disk.iv_name, node, msg)
2687         if not ignore_secondaries:
2688           disks_ok = False
2689
2690   # FIXME: race condition on drbd migration to primary
2691
2692   # 2nd pass, do only the primary node
2693   for inst_disk in instance.disks:
2694     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2695       if node != instance.primary_node:
2696         continue
2697       lu.cfg.SetDiskID(node_disk, node)
2698       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2699       msg = result.RemoteFailMsg()
2700       if msg:
2701         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2702                            " (is_primary=True, pass=2): %s",
2703                            inst_disk.iv_name, node, msg)
2704         disks_ok = False
2705     device_info.append((instance.primary_node, inst_disk.iv_name,
2706                         result.payload))
2707
2708   # leave the disks configured for the primary node
2709   # this is a workaround that would be fixed better by
2710   # improving the logical/physical id handling
2711   for disk in instance.disks:
2712     lu.cfg.SetDiskID(disk, instance.primary_node)
2713
2714   return disks_ok, device_info
2715
2716
2717 def _StartInstanceDisks(lu, instance, force):
2718   """Start the disks of an instance.
2719
2720   """
2721   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2722                                            ignore_secondaries=force)
2723   if not disks_ok:
2724     _ShutdownInstanceDisks(lu, instance)
2725     if force is not None and not force:
2726       lu.proc.LogWarning("", hint="If the message above refers to a"
2727                          " secondary node,"
2728                          " you can retry the operation using '--force'.")
2729     raise errors.OpExecError("Disk consistency error")
2730
2731
2732 class LUDeactivateInstanceDisks(NoHooksLU):
2733   """Shutdown an instance's disks.
2734
2735   """
2736   _OP_REQP = ["instance_name"]
2737   REQ_BGL = False
2738
2739   def ExpandNames(self):
2740     self._ExpandAndLockInstance()
2741     self.needed_locks[locking.LEVEL_NODE] = []
2742     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2743
2744   def DeclareLocks(self, level):
2745     if level == locking.LEVEL_NODE:
2746       self._LockInstancesNodes()
2747
2748   def CheckPrereq(self):
2749     """Check prerequisites.
2750
2751     This checks that the instance is in the cluster.
2752
2753     """
2754     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2755     assert self.instance is not None, \
2756       "Cannot retrieve locked instance %s" % self.op.instance_name
2757
2758   def Exec(self, feedback_fn):
2759     """Deactivate the disks
2760
2761     """
2762     instance = self.instance
2763     _SafeShutdownInstanceDisks(self, instance)
2764
2765
2766 def _SafeShutdownInstanceDisks(lu, instance):
2767   """Shutdown block devices of an instance.
2768
2769   This function checks if an instance is running, before calling
2770   _ShutdownInstanceDisks.
2771
2772   """
2773   ins_l = lu.rpc.call_instance_list([instance.primary_node],
2774                                       [instance.hypervisor])
2775   ins_l = ins_l[instance.primary_node]
2776   if ins_l.failed or not isinstance(ins_l.data, list):
2777     raise errors.OpExecError("Can't contact node '%s'" %
2778                              instance.primary_node)
2779
2780   if instance.name in ins_l.data:
2781     raise errors.OpExecError("Instance is running, can't shutdown"
2782                              " block devices.")
2783
2784   _ShutdownInstanceDisks(lu, instance)
2785
2786
2787 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2788   """Shutdown block devices of an instance.
2789
2790   This does the shutdown on all nodes of the instance.
2791
2792   If the ignore_primary is false, errors on the primary node are
2793   ignored.
2794
2795   """
2796   all_result = True
2797   for disk in instance.disks:
2798     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2799       lu.cfg.SetDiskID(top_disk, node)
2800       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2801       msg = result.RemoteFailMsg()
2802       if msg:
2803         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2804                       disk.iv_name, node, msg)
2805         if not ignore_primary or node != instance.primary_node:
2806           all_result = False
2807   return all_result
2808
2809
2810 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2811   """Checks if a node has enough free memory.
2812
2813   This function check if a given node has the needed amount of free
2814   memory. In case the node has less memory or we cannot get the
2815   information from the node, this function raise an OpPrereqError
2816   exception.
2817
2818   @type lu: C{LogicalUnit}
2819   @param lu: a logical unit from which we get configuration data
2820   @type node: C{str}
2821   @param node: the node to check
2822   @type reason: C{str}
2823   @param reason: string to use in the error message
2824   @type requested: C{int}
2825   @param requested: the amount of memory in MiB to check for
2826   @type hypervisor_name: C{str}
2827   @param hypervisor_name: the hypervisor to ask for memory stats
2828   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2829       we cannot check the node
2830
2831   """
2832   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2833   nodeinfo[node].Raise()
2834   free_mem = nodeinfo[node].data.get('memory_free')
2835   if not isinstance(free_mem, int):
2836     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2837                              " was '%s'" % (node, free_mem))
2838   if requested > free_mem:
2839     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2840                              " needed %s MiB, available %s MiB" %
2841                              (node, reason, requested, free_mem))
2842
2843
2844 class LUStartupInstance(LogicalUnit):
2845   """Starts an instance.
2846
2847   """
2848   HPATH = "instance-start"
2849   HTYPE = constants.HTYPE_INSTANCE
2850   _OP_REQP = ["instance_name", "force"]
2851   REQ_BGL = False
2852
2853   def ExpandNames(self):
2854     self._ExpandAndLockInstance()
2855
2856   def BuildHooksEnv(self):
2857     """Build hooks env.
2858
2859     This runs on master, primary and secondary nodes of the instance.
2860
2861     """
2862     env = {
2863       "FORCE": self.op.force,
2864       }
2865     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2866     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2867     return env, nl, nl
2868
2869   def CheckPrereq(self):
2870     """Check prerequisites.
2871
2872     This checks that the instance is in the cluster.
2873
2874     """
2875     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2876     assert self.instance is not None, \
2877       "Cannot retrieve locked instance %s" % self.op.instance_name
2878
2879     # extra beparams
2880     self.beparams = getattr(self.op, "beparams", {})
2881     if self.beparams:
2882       if not isinstance(self.beparams, dict):
2883         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2884                                    " dict" % (type(self.beparams), ))
2885       # fill the beparams dict
2886       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2887       self.op.beparams = self.beparams
2888
2889     # extra hvparams
2890     self.hvparams = getattr(self.op, "hvparams", {})
2891     if self.hvparams:
2892       if not isinstance(self.hvparams, dict):
2893         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2894                                    " dict" % (type(self.hvparams), ))
2895
2896       # check hypervisor parameter syntax (locally)
2897       cluster = self.cfg.GetClusterInfo()
2898       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2899       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2900                                     instance.hvparams)
2901       filled_hvp.update(self.hvparams)
2902       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2903       hv_type.CheckParameterSyntax(filled_hvp)
2904       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2905       self.op.hvparams = self.hvparams
2906
2907     _CheckNodeOnline(self, instance.primary_node)
2908
2909     bep = self.cfg.GetClusterInfo().FillBE(instance)
2910     # check bridges existance
2911     _CheckInstanceBridgesExist(self, instance)
2912
2913     remote_info = self.rpc.call_instance_info(instance.primary_node,
2914                                               instance.name,
2915                                               instance.hypervisor)
2916     remote_info.Raise()
2917     if not remote_info.data:
2918       _CheckNodeFreeMemory(self, instance.primary_node,
2919                            "starting instance %s" % instance.name,
2920                            bep[constants.BE_MEMORY], instance.hypervisor)
2921
2922   def Exec(self, feedback_fn):
2923     """Start the instance.
2924
2925     """
2926     instance = self.instance
2927     force = self.op.force
2928
2929     self.cfg.MarkInstanceUp(instance.name)
2930
2931     node_current = instance.primary_node
2932
2933     _StartInstanceDisks(self, instance, force)
2934
2935     result = self.rpc.call_instance_start(node_current, instance,
2936                                           self.hvparams, self.beparams)
2937     msg = result.RemoteFailMsg()
2938     if msg:
2939       _ShutdownInstanceDisks(self, instance)
2940       raise errors.OpExecError("Could not start instance: %s" % msg)
2941
2942
2943 class LURebootInstance(LogicalUnit):
2944   """Reboot an instance.
2945
2946   """
2947   HPATH = "instance-reboot"
2948   HTYPE = constants.HTYPE_INSTANCE
2949   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2950   REQ_BGL = False
2951
2952   def ExpandNames(self):
2953     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2954                                    constants.INSTANCE_REBOOT_HARD,
2955                                    constants.INSTANCE_REBOOT_FULL]:
2956       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2957                                   (constants.INSTANCE_REBOOT_SOFT,
2958                                    constants.INSTANCE_REBOOT_HARD,
2959                                    constants.INSTANCE_REBOOT_FULL))
2960     self._ExpandAndLockInstance()
2961
2962   def BuildHooksEnv(self):
2963     """Build hooks env.
2964
2965     This runs on master, primary and secondary nodes of the instance.
2966
2967     """
2968     env = {
2969       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2970       "REBOOT_TYPE": self.op.reboot_type,
2971       }
2972     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2973     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2974     return env, nl, nl
2975
2976   def CheckPrereq(self):
2977     """Check prerequisites.
2978
2979     This checks that the instance is in the cluster.
2980
2981     """
2982     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2983     assert self.instance is not None, \
2984       "Cannot retrieve locked instance %s" % self.op.instance_name
2985
2986     _CheckNodeOnline(self, instance.primary_node)
2987
2988     # check bridges existance
2989     _CheckInstanceBridgesExist(self, instance)
2990
2991   def Exec(self, feedback_fn):
2992     """Reboot the instance.
2993
2994     """
2995     instance = self.instance
2996     ignore_secondaries = self.op.ignore_secondaries
2997     reboot_type = self.op.reboot_type
2998
2999     node_current = instance.primary_node
3000
3001     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3002                        constants.INSTANCE_REBOOT_HARD]:
3003       for disk in instance.disks:
3004         self.cfg.SetDiskID(disk, node_current)
3005       result = self.rpc.call_instance_reboot(node_current, instance,
3006                                              reboot_type)
3007       msg = result.RemoteFailMsg()
3008       if msg:
3009         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3010     else:
3011       result = self.rpc.call_instance_shutdown(node_current, instance)
3012       msg = result.RemoteFailMsg()
3013       if msg:
3014         raise errors.OpExecError("Could not shutdown instance for"
3015                                  " full reboot: %s" % msg)
3016       _ShutdownInstanceDisks(self, instance)
3017       _StartInstanceDisks(self, instance, ignore_secondaries)
3018       result = self.rpc.call_instance_start(node_current, instance, None, None)
3019       msg = result.RemoteFailMsg()
3020       if msg:
3021         _ShutdownInstanceDisks(self, instance)
3022         raise errors.OpExecError("Could not start instance for"
3023                                  " full reboot: %s" % msg)
3024
3025     self.cfg.MarkInstanceUp(instance.name)
3026
3027
3028 class LUShutdownInstance(LogicalUnit):
3029   """Shutdown an instance.
3030
3031   """
3032   HPATH = "instance-stop"
3033   HTYPE = constants.HTYPE_INSTANCE
3034   _OP_REQP = ["instance_name"]
3035   REQ_BGL = False
3036
3037   def ExpandNames(self):
3038     self._ExpandAndLockInstance()
3039
3040   def BuildHooksEnv(self):
3041     """Build hooks env.
3042
3043     This runs on master, primary and secondary nodes of the instance.
3044
3045     """
3046     env = _BuildInstanceHookEnvByObject(self, self.instance)
3047     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3048     return env, nl, nl
3049
3050   def CheckPrereq(self):
3051     """Check prerequisites.
3052
3053     This checks that the instance is in the cluster.
3054
3055     """
3056     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3057     assert self.instance is not None, \
3058       "Cannot retrieve locked instance %s" % self.op.instance_name
3059     _CheckNodeOnline(self, self.instance.primary_node)
3060
3061   def Exec(self, feedback_fn):
3062     """Shutdown the instance.
3063
3064     """
3065     instance = self.instance
3066     node_current = instance.primary_node
3067     self.cfg.MarkInstanceDown(instance.name)
3068     result = self.rpc.call_instance_shutdown(node_current, instance)
3069     msg = result.RemoteFailMsg()
3070     if msg:
3071       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3072
3073     _ShutdownInstanceDisks(self, instance)
3074
3075
3076 class LUReinstallInstance(LogicalUnit):
3077   """Reinstall an instance.
3078
3079   """
3080   HPATH = "instance-reinstall"
3081   HTYPE = constants.HTYPE_INSTANCE
3082   _OP_REQP = ["instance_name"]
3083   REQ_BGL = False
3084
3085   def ExpandNames(self):
3086     self._ExpandAndLockInstance()
3087
3088   def BuildHooksEnv(self):
3089     """Build hooks env.
3090
3091     This runs on master, primary and secondary nodes of the instance.
3092
3093     """
3094     env = _BuildInstanceHookEnvByObject(self, self.instance)
3095     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3096     return env, nl, nl
3097
3098   def CheckPrereq(self):
3099     """Check prerequisites.
3100
3101     This checks that the instance is in the cluster and is not running.
3102
3103     """
3104     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3105     assert instance is not None, \
3106       "Cannot retrieve locked instance %s" % self.op.instance_name
3107     _CheckNodeOnline(self, instance.primary_node)
3108
3109     if instance.disk_template == constants.DT_DISKLESS:
3110       raise errors.OpPrereqError("Instance '%s' has no disks" %
3111                                  self.op.instance_name)
3112     if instance.admin_up:
3113       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3114                                  self.op.instance_name)
3115     remote_info = self.rpc.call_instance_info(instance.primary_node,
3116                                               instance.name,
3117                                               instance.hypervisor)
3118     remote_info.Raise()
3119     if remote_info.data:
3120       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3121                                  (self.op.instance_name,
3122                                   instance.primary_node))
3123
3124     self.op.os_type = getattr(self.op, "os_type", None)
3125     if self.op.os_type is not None:
3126       # OS verification
3127       pnode = self.cfg.GetNodeInfo(
3128         self.cfg.ExpandNodeName(instance.primary_node))
3129       if pnode is None:
3130         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3131                                    self.op.pnode)
3132       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3133       result.Raise()
3134       if not isinstance(result.data, objects.OS):
3135         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3136                                    " primary node"  % self.op.os_type)
3137
3138     self.instance = instance
3139
3140   def Exec(self, feedback_fn):
3141     """Reinstall the instance.
3142
3143     """
3144     inst = self.instance
3145
3146     if self.op.os_type is not None:
3147       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3148       inst.os = self.op.os_type
3149       self.cfg.Update(inst)
3150
3151     _StartInstanceDisks(self, inst, None)
3152     try:
3153       feedback_fn("Running the instance OS create scripts...")
3154       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3155       msg = result.RemoteFailMsg()
3156       if msg:
3157         raise errors.OpExecError("Could not install OS for instance %s"
3158                                  " on node %s: %s" %
3159                                  (inst.name, inst.primary_node, msg))
3160     finally:
3161       _ShutdownInstanceDisks(self, inst)
3162
3163
3164 class LURenameInstance(LogicalUnit):
3165   """Rename an instance.
3166
3167   """
3168   HPATH = "instance-rename"
3169   HTYPE = constants.HTYPE_INSTANCE
3170   _OP_REQP = ["instance_name", "new_name"]
3171
3172   def BuildHooksEnv(self):
3173     """Build hooks env.
3174
3175     This runs on master, primary and secondary nodes of the instance.
3176
3177     """
3178     env = _BuildInstanceHookEnvByObject(self, self.instance)
3179     env["INSTANCE_NEW_NAME"] = self.op.new_name
3180     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3181     return env, nl, nl
3182
3183   def CheckPrereq(self):
3184     """Check prerequisites.
3185
3186     This checks that the instance is in the cluster and is not running.
3187
3188     """
3189     instance = self.cfg.GetInstanceInfo(
3190       self.cfg.ExpandInstanceName(self.op.instance_name))
3191     if instance is None:
3192       raise errors.OpPrereqError("Instance '%s' not known" %
3193                                  self.op.instance_name)
3194     _CheckNodeOnline(self, instance.primary_node)
3195
3196     if instance.admin_up:
3197       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3198                                  self.op.instance_name)
3199     remote_info = self.rpc.call_instance_info(instance.primary_node,
3200                                               instance.name,
3201                                               instance.hypervisor)
3202     remote_info.Raise()
3203     if remote_info.data:
3204       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3205                                  (self.op.instance_name,
3206                                   instance.primary_node))
3207     self.instance = instance
3208
3209     # new name verification
3210     name_info = utils.HostInfo(self.op.new_name)
3211
3212     self.op.new_name = new_name = name_info.name
3213     instance_list = self.cfg.GetInstanceList()
3214     if new_name in instance_list:
3215       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3216                                  new_name)
3217
3218     if not getattr(self.op, "ignore_ip", False):
3219       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3220         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3221                                    (name_info.ip, new_name))
3222
3223
3224   def Exec(self, feedback_fn):
3225     """Reinstall the instance.
3226
3227     """
3228     inst = self.instance
3229     old_name = inst.name
3230
3231     if inst.disk_template == constants.DT_FILE:
3232       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3233
3234     self.cfg.RenameInstance(inst.name, self.op.new_name)
3235     # Change the instance lock. This is definitely safe while we hold the BGL
3236     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3237     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3238
3239     # re-read the instance from the configuration after rename
3240     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3241
3242     if inst.disk_template == constants.DT_FILE:
3243       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3244       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3245                                                      old_file_storage_dir,
3246                                                      new_file_storage_dir)
3247       result.Raise()
3248       if not result.data:
3249         raise errors.OpExecError("Could not connect to node '%s' to rename"
3250                                  " directory '%s' to '%s' (but the instance"
3251                                  " has been renamed in Ganeti)" % (
3252                                  inst.primary_node, old_file_storage_dir,
3253                                  new_file_storage_dir))
3254
3255       if not result.data[0]:
3256         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3257                                  " (but the instance has been renamed in"
3258                                  " Ganeti)" % (old_file_storage_dir,
3259                                                new_file_storage_dir))
3260
3261     _StartInstanceDisks(self, inst, None)
3262     try:
3263       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3264                                                  old_name)
3265       msg = result.RemoteFailMsg()
3266       if msg:
3267         msg = ("Could not run OS rename script for instance %s on node %s"
3268                " (but the instance has been renamed in Ganeti): %s" %
3269                (inst.name, inst.primary_node, msg))
3270         self.proc.LogWarning(msg)
3271     finally:
3272       _ShutdownInstanceDisks(self, inst)
3273
3274
3275 class LURemoveInstance(LogicalUnit):
3276   """Remove an instance.
3277
3278   """
3279   HPATH = "instance-remove"
3280   HTYPE = constants.HTYPE_INSTANCE
3281   _OP_REQP = ["instance_name", "ignore_failures"]
3282   REQ_BGL = False
3283
3284   def ExpandNames(self):
3285     self._ExpandAndLockInstance()
3286     self.needed_locks[locking.LEVEL_NODE] = []
3287     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3288
3289   def DeclareLocks(self, level):
3290     if level == locking.LEVEL_NODE:
3291       self._LockInstancesNodes()
3292
3293   def BuildHooksEnv(self):
3294     """Build hooks env.
3295
3296     This runs on master, primary and secondary nodes of the instance.
3297
3298     """
3299     env = _BuildInstanceHookEnvByObject(self, self.instance)
3300     nl = [self.cfg.GetMasterNode()]
3301     return env, nl, nl
3302
3303   def CheckPrereq(self):
3304     """Check prerequisites.
3305
3306     This checks that the instance is in the cluster.
3307
3308     """
3309     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3310     assert self.instance is not None, \
3311       "Cannot retrieve locked instance %s" % self.op.instance_name
3312
3313   def Exec(self, feedback_fn):
3314     """Remove the instance.
3315
3316     """
3317     instance = self.instance
3318     logging.info("Shutting down instance %s on node %s",
3319                  instance.name, instance.primary_node)
3320
3321     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3322     msg = result.RemoteFailMsg()
3323     if msg:
3324       if self.op.ignore_failures:
3325         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3326       else:
3327         raise errors.OpExecError("Could not shutdown instance %s on"
3328                                  " node %s: %s" %
3329                                  (instance.name, instance.primary_node, msg))
3330
3331     logging.info("Removing block devices for instance %s", instance.name)
3332
3333     if not _RemoveDisks(self, instance):
3334       if self.op.ignore_failures:
3335         feedback_fn("Warning: can't remove instance's disks")
3336       else:
3337         raise errors.OpExecError("Can't remove instance's disks")
3338
3339     logging.info("Removing instance %s out of cluster config", instance.name)
3340
3341     self.cfg.RemoveInstance(instance.name)
3342     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3343
3344
3345 class LUQueryInstances(NoHooksLU):
3346   """Logical unit for querying instances.
3347
3348   """
3349   _OP_REQP = ["output_fields", "names", "use_locking"]
3350   REQ_BGL = False
3351   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3352                                     "admin_state",
3353                                     "disk_template", "ip", "mac", "bridge",
3354                                     "sda_size", "sdb_size", "vcpus", "tags",
3355                                     "network_port", "beparams",
3356                                     r"(disk)\.(size)/([0-9]+)",
3357                                     r"(disk)\.(sizes)", "disk_usage",
3358                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3359                                     r"(nic)\.(macs|ips|bridges)",
3360                                     r"(disk|nic)\.(count)",
3361                                     "serial_no", "hypervisor", "hvparams",] +
3362                                   ["hv/%s" % name
3363                                    for name in constants.HVS_PARAMETERS] +
3364                                   ["be/%s" % name
3365                                    for name in constants.BES_PARAMETERS])
3366   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3367
3368
3369   def ExpandNames(self):
3370     _CheckOutputFields(static=self._FIELDS_STATIC,
3371                        dynamic=self._FIELDS_DYNAMIC,
3372                        selected=self.op.output_fields)
3373
3374     self.needed_locks = {}
3375     self.share_locks[locking.LEVEL_INSTANCE] = 1
3376     self.share_locks[locking.LEVEL_NODE] = 1
3377
3378     if self.op.names:
3379       self.wanted = _GetWantedInstances(self, self.op.names)
3380     else:
3381       self.wanted = locking.ALL_SET
3382
3383     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3384     self.do_locking = self.do_node_query and self.op.use_locking
3385     if self.do_locking:
3386       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3387       self.needed_locks[locking.LEVEL_NODE] = []
3388       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3389
3390   def DeclareLocks(self, level):
3391     if level == locking.LEVEL_NODE and self.do_locking:
3392       self._LockInstancesNodes()
3393
3394   def CheckPrereq(self):
3395     """Check prerequisites.
3396
3397     """
3398     pass
3399
3400   def Exec(self, feedback_fn):
3401     """Computes the list of nodes and their attributes.
3402
3403     """
3404     all_info = self.cfg.GetAllInstancesInfo()
3405     if self.wanted == locking.ALL_SET:
3406       # caller didn't specify instance names, so ordering is not important
3407       if self.do_locking:
3408         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3409       else:
3410         instance_names = all_info.keys()
3411       instance_names = utils.NiceSort(instance_names)
3412     else:
3413       # caller did specify names, so we must keep the ordering
3414       if self.do_locking:
3415         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3416       else:
3417         tgt_set = all_info.keys()
3418       missing = set(self.wanted).difference(tgt_set)
3419       if missing:
3420         raise errors.OpExecError("Some instances were removed before"
3421                                  " retrieving their data: %s" % missing)
3422       instance_names = self.wanted
3423
3424     instance_list = [all_info[iname] for iname in instance_names]
3425
3426     # begin data gathering
3427
3428     nodes = frozenset([inst.primary_node for inst in instance_list])
3429     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3430
3431     bad_nodes = []
3432     off_nodes = []
3433     if self.do_node_query:
3434       live_data = {}
3435       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3436       for name in nodes:
3437         result = node_data[name]
3438         if result.offline:
3439           # offline nodes will be in both lists
3440           off_nodes.append(name)
3441         if result.failed:
3442           bad_nodes.append(name)
3443         else:
3444           if result.data:
3445             live_data.update(result.data)
3446             # else no instance is alive
3447     else:
3448       live_data = dict([(name, {}) for name in instance_names])
3449
3450     # end data gathering
3451
3452     HVPREFIX = "hv/"
3453     BEPREFIX = "be/"
3454     output = []
3455     for instance in instance_list:
3456       iout = []
3457       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3458       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3459       for field in self.op.output_fields:
3460         st_match = self._FIELDS_STATIC.Matches(field)
3461         if field == "name":
3462           val = instance.name
3463         elif field == "os":
3464           val = instance.os
3465         elif field == "pnode":
3466           val = instance.primary_node
3467         elif field == "snodes":
3468           val = list(instance.secondary_nodes)
3469         elif field == "admin_state":
3470           val = instance.admin_up
3471         elif field == "oper_state":
3472           if instance.primary_node in bad_nodes:
3473             val = None
3474           else:
3475             val = bool(live_data.get(instance.name))
3476         elif field == "status":
3477           if instance.primary_node in off_nodes:
3478             val = "ERROR_nodeoffline"
3479           elif instance.primary_node in bad_nodes:
3480             val = "ERROR_nodedown"
3481           else:
3482             running = bool(live_data.get(instance.name))
3483             if running:
3484               if instance.admin_up:
3485                 val = "running"
3486               else:
3487                 val = "ERROR_up"
3488             else:
3489               if instance.admin_up:
3490                 val = "ERROR_down"
3491               else:
3492                 val = "ADMIN_down"
3493         elif field == "oper_ram":
3494           if instance.primary_node in bad_nodes:
3495             val = None
3496           elif instance.name in live_data:
3497             val = live_data[instance.name].get("memory", "?")
3498           else:
3499             val = "-"
3500         elif field == "disk_template":
3501           val = instance.disk_template
3502         elif field == "ip":
3503           val = instance.nics[0].ip
3504         elif field == "bridge":
3505           val = instance.nics[0].bridge
3506         elif field == "mac":
3507           val = instance.nics[0].mac
3508         elif field == "sda_size" or field == "sdb_size":
3509           idx = ord(field[2]) - ord('a')
3510           try:
3511             val = instance.FindDisk(idx).size
3512           except errors.OpPrereqError:
3513             val = None
3514         elif field == "disk_usage": # total disk usage per node
3515           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3516           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3517         elif field == "tags":
3518           val = list(instance.GetTags())
3519         elif field == "serial_no":
3520           val = instance.serial_no
3521         elif field == "network_port":
3522           val = instance.network_port
3523         elif field == "hypervisor":
3524           val = instance.hypervisor
3525         elif field == "hvparams":
3526           val = i_hv
3527         elif (field.startswith(HVPREFIX) and
3528               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3529           val = i_hv.get(field[len(HVPREFIX):], None)
3530         elif field == "beparams":
3531           val = i_be
3532         elif (field.startswith(BEPREFIX) and
3533               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3534           val = i_be.get(field[len(BEPREFIX):], None)
3535         elif st_match and st_match.groups():
3536           # matches a variable list
3537           st_groups = st_match.groups()
3538           if st_groups and st_groups[0] == "disk":
3539             if st_groups[1] == "count":
3540               val = len(instance.disks)
3541             elif st_groups[1] == "sizes":
3542               val = [disk.size for disk in instance.disks]
3543             elif st_groups[1] == "size":
3544               try:
3545                 val = instance.FindDisk(st_groups[2]).size
3546               except errors.OpPrereqError:
3547                 val = None
3548             else:
3549               assert False, "Unhandled disk parameter"
3550           elif st_groups[0] == "nic":
3551             if st_groups[1] == "count":
3552               val = len(instance.nics)
3553             elif st_groups[1] == "macs":
3554               val = [nic.mac for nic in instance.nics]
3555             elif st_groups[1] == "ips":
3556               val = [nic.ip for nic in instance.nics]
3557             elif st_groups[1] == "bridges":
3558               val = [nic.bridge for nic in instance.nics]
3559             else:
3560               # index-based item
3561               nic_idx = int(st_groups[2])
3562               if nic_idx >= len(instance.nics):
3563                 val = None
3564               else:
3565                 if st_groups[1] == "mac":
3566                   val = instance.nics[nic_idx].mac
3567                 elif st_groups[1] == "ip":
3568                   val = instance.nics[nic_idx].ip
3569                 elif st_groups[1] == "bridge":
3570                   val = instance.nics[nic_idx].bridge
3571                 else:
3572                   assert False, "Unhandled NIC parameter"
3573           else:
3574             assert False, "Unhandled variable parameter"
3575         else:
3576           raise errors.ParameterError(field)
3577         iout.append(val)
3578       output.append(iout)
3579
3580     return output
3581
3582
3583 class LUFailoverInstance(LogicalUnit):
3584   """Failover an instance.
3585
3586   """
3587   HPATH = "instance-failover"
3588   HTYPE = constants.HTYPE_INSTANCE
3589   _OP_REQP = ["instance_name", "ignore_consistency"]
3590   REQ_BGL = False
3591
3592   def ExpandNames(self):
3593     self._ExpandAndLockInstance()
3594     self.needed_locks[locking.LEVEL_NODE] = []
3595     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3596
3597   def DeclareLocks(self, level):
3598     if level == locking.LEVEL_NODE:
3599       self._LockInstancesNodes()
3600
3601   def BuildHooksEnv(self):
3602     """Build hooks env.
3603
3604     This runs on master, primary and secondary nodes of the instance.
3605
3606     """
3607     env = {
3608       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3609       }
3610     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3611     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3612     return env, nl, nl
3613
3614   def CheckPrereq(self):
3615     """Check prerequisites.
3616
3617     This checks that the instance is in the cluster.
3618
3619     """
3620     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3621     assert self.instance is not None, \
3622       "Cannot retrieve locked instance %s" % self.op.instance_name
3623
3624     bep = self.cfg.GetClusterInfo().FillBE(instance)
3625     if instance.disk_template not in constants.DTS_NET_MIRROR:
3626       raise errors.OpPrereqError("Instance's disk layout is not"
3627                                  " network mirrored, cannot failover.")
3628
3629     secondary_nodes = instance.secondary_nodes
3630     if not secondary_nodes:
3631       raise errors.ProgrammerError("no secondary node but using "
3632                                    "a mirrored disk template")
3633
3634     target_node = secondary_nodes[0]
3635     _CheckNodeOnline(self, target_node)
3636     _CheckNodeNotDrained(self, target_node)
3637     # check memory requirements on the secondary node
3638     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3639                          instance.name, bep[constants.BE_MEMORY],
3640                          instance.hypervisor)
3641     # check bridge existance
3642     _CheckInstanceBridgesExist(self, instance, node=target_node)
3643
3644   def Exec(self, feedback_fn):
3645     """Failover an instance.
3646
3647     The failover is done by shutting it down on its present node and
3648     starting it on the secondary.
3649
3650     """
3651     instance = self.instance
3652
3653     source_node = instance.primary_node
3654     target_node = instance.secondary_nodes[0]
3655
3656     feedback_fn("* checking disk consistency between source and target")
3657     for dev in instance.disks:
3658       # for drbd, these are drbd over lvm
3659       if not _CheckDiskConsistency(self, dev, target_node, False):
3660         if instance.admin_up and not self.op.ignore_consistency:
3661           raise errors.OpExecError("Disk %s is degraded on target node,"
3662                                    " aborting failover." % dev.iv_name)
3663
3664     feedback_fn("* shutting down instance on source node")
3665     logging.info("Shutting down instance %s on node %s",
3666                  instance.name, source_node)
3667
3668     result = self.rpc.call_instance_shutdown(source_node, instance)
3669     msg = result.RemoteFailMsg()
3670     if msg:
3671       if self.op.ignore_consistency:
3672         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3673                              " Proceeding anyway. Please make sure node"
3674                              " %s is down. Error details: %s",
3675                              instance.name, source_node, source_node, msg)
3676       else:
3677         raise errors.OpExecError("Could not shutdown instance %s on"
3678                                  " node %s: %s" %
3679                                  (instance.name, source_node, msg))
3680
3681     feedback_fn("* deactivating the instance's disks on source node")
3682     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3683       raise errors.OpExecError("Can't shut down the instance's disks.")
3684
3685     instance.primary_node = target_node
3686     # distribute new instance config to the other nodes
3687     self.cfg.Update(instance)
3688
3689     # Only start the instance if it's marked as up
3690     if instance.admin_up:
3691       feedback_fn("* activating the instance's disks on target node")
3692       logging.info("Starting instance %s on node %s",
3693                    instance.name, target_node)
3694
3695       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3696                                                ignore_secondaries=True)
3697       if not disks_ok:
3698         _ShutdownInstanceDisks(self, instance)
3699         raise errors.OpExecError("Can't activate the instance's disks")
3700
3701       feedback_fn("* starting the instance on the target node")
3702       result = self.rpc.call_instance_start(target_node, instance, None, None)
3703       msg = result.RemoteFailMsg()
3704       if msg:
3705         _ShutdownInstanceDisks(self, instance)
3706         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3707                                  (instance.name, target_node, msg))
3708
3709
3710 class LUMigrateInstance(LogicalUnit):
3711   """Migrate an instance.
3712
3713   This is migration without shutting down, compared to the failover,
3714   which is done with shutdown.
3715
3716   """
3717   HPATH = "instance-migrate"
3718   HTYPE = constants.HTYPE_INSTANCE
3719   _OP_REQP = ["instance_name", "live", "cleanup"]
3720
3721   REQ_BGL = False
3722
3723   def ExpandNames(self):
3724     self._ExpandAndLockInstance()
3725     self.needed_locks[locking.LEVEL_NODE] = []
3726     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3727
3728   def DeclareLocks(self, level):
3729     if level == locking.LEVEL_NODE:
3730       self._LockInstancesNodes()
3731
3732   def BuildHooksEnv(self):
3733     """Build hooks env.
3734
3735     This runs on master, primary and secondary nodes of the instance.
3736
3737     """
3738     env = _BuildInstanceHookEnvByObject(self, self.instance)
3739     env["MIGRATE_LIVE"] = self.op.live
3740     env["MIGRATE_CLEANUP"] = self.op.cleanup
3741     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3742     return env, nl, nl
3743
3744   def CheckPrereq(self):
3745     """Check prerequisites.
3746
3747     This checks that the instance is in the cluster.
3748
3749     """
3750     instance = self.cfg.GetInstanceInfo(
3751       self.cfg.ExpandInstanceName(self.op.instance_name))
3752     if instance is None:
3753       raise errors.OpPrereqError("Instance '%s' not known" %
3754                                  self.op.instance_name)
3755
3756     if instance.disk_template != constants.DT_DRBD8:
3757       raise errors.OpPrereqError("Instance's disk layout is not"
3758                                  " drbd8, cannot migrate.")
3759
3760     secondary_nodes = instance.secondary_nodes
3761     if not secondary_nodes:
3762       raise errors.ConfigurationError("No secondary node but using"
3763                                       " drbd8 disk template")
3764
3765     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3766
3767     target_node = secondary_nodes[0]
3768     # check memory requirements on the secondary node
3769     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3770                          instance.name, i_be[constants.BE_MEMORY],
3771                          instance.hypervisor)
3772
3773     # check bridge existance
3774     _CheckInstanceBridgesExist(self, instance, node=target_node)
3775
3776     if not self.op.cleanup:
3777       _CheckNodeNotDrained(self, target_node)
3778       result = self.rpc.call_instance_migratable(instance.primary_node,
3779                                                  instance)
3780       msg = result.RemoteFailMsg()
3781       if msg:
3782         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3783                                    msg)
3784
3785     self.instance = instance
3786
3787   def _WaitUntilSync(self):
3788     """Poll with custom rpc for disk sync.
3789
3790     This uses our own step-based rpc call.
3791
3792     """
3793     self.feedback_fn("* wait until resync is done")
3794     all_done = False
3795     while not all_done:
3796       all_done = True
3797       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3798                                             self.nodes_ip,
3799                                             self.instance.disks)
3800       min_percent = 100
3801       for node, nres in result.items():
3802         msg = nres.RemoteFailMsg()
3803         if msg:
3804           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3805                                    (node, msg))
3806         node_done, node_percent = nres.payload
3807         all_done = all_done and node_done
3808         if node_percent is not None:
3809           min_percent = min(min_percent, node_percent)
3810       if not all_done:
3811         if min_percent < 100:
3812           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3813         time.sleep(2)
3814
3815   def _EnsureSecondary(self, node):
3816     """Demote a node to secondary.
3817
3818     """
3819     self.feedback_fn("* switching node %s to secondary mode" % node)
3820
3821     for dev in self.instance.disks:
3822       self.cfg.SetDiskID(dev, node)
3823
3824     result = self.rpc.call_blockdev_close(node, self.instance.name,
3825                                           self.instance.disks)
3826     msg = result.RemoteFailMsg()
3827     if msg:
3828       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3829                                " error %s" % (node, msg))
3830
3831   def _GoStandalone(self):
3832     """Disconnect from the network.
3833
3834     """
3835     self.feedback_fn("* changing into standalone mode")
3836     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3837                                                self.instance.disks)
3838     for node, nres in result.items():
3839       msg = nres.RemoteFailMsg()
3840       if msg:
3841         raise errors.OpExecError("Cannot disconnect disks node %s,"
3842                                  " error %s" % (node, msg))
3843
3844   def _GoReconnect(self, multimaster):
3845     """Reconnect to the network.
3846
3847     """
3848     if multimaster:
3849       msg = "dual-master"
3850     else:
3851       msg = "single-master"
3852     self.feedback_fn("* changing disks into %s mode" % msg)
3853     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3854                                            self.instance.disks,
3855                                            self.instance.name, multimaster)
3856     for node, nres in result.items():
3857       msg = nres.RemoteFailMsg()
3858       if msg:
3859         raise errors.OpExecError("Cannot change disks config on node %s,"
3860                                  " error: %s" % (node, msg))
3861
3862   def _ExecCleanup(self):
3863     """Try to cleanup after a failed migration.
3864
3865     The cleanup is done by:
3866       - check that the instance is running only on one node
3867         (and update the config if needed)
3868       - change disks on its secondary node to secondary
3869       - wait until disks are fully synchronized
3870       - disconnect from the network
3871       - change disks into single-master mode
3872       - wait again until disks are fully synchronized
3873
3874     """
3875     instance = self.instance
3876     target_node = self.target_node
3877     source_node = self.source_node
3878
3879     # check running on only one node
3880     self.feedback_fn("* checking where the instance actually runs"
3881                      " (if this hangs, the hypervisor might be in"
3882                      " a bad state)")
3883     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3884     for node, result in ins_l.items():
3885       result.Raise()
3886       if not isinstance(result.data, list):
3887         raise errors.OpExecError("Can't contact node '%s'" % node)
3888
3889     runningon_source = instance.name in ins_l[source_node].data
3890     runningon_target = instance.name in ins_l[target_node].data
3891
3892     if runningon_source and runningon_target:
3893       raise errors.OpExecError("Instance seems to be running on two nodes,"
3894                                " or the hypervisor is confused. You will have"
3895                                " to ensure manually that it runs only on one"
3896                                " and restart this operation.")
3897
3898     if not (runningon_source or runningon_target):
3899       raise errors.OpExecError("Instance does not seem to be running at all."
3900                                " In this case, it's safer to repair by"
3901                                " running 'gnt-instance stop' to ensure disk"
3902                                " shutdown, and then restarting it.")
3903
3904     if runningon_target:
3905       # the migration has actually succeeded, we need to update the config
3906       self.feedback_fn("* instance running on secondary node (%s),"
3907                        " updating config" % target_node)
3908       instance.primary_node = target_node
3909       self.cfg.Update(instance)
3910       demoted_node = source_node
3911     else:
3912       self.feedback_fn("* instance confirmed to be running on its"
3913                        " primary node (%s)" % source_node)
3914       demoted_node = target_node
3915
3916     self._EnsureSecondary(demoted_node)
3917     try:
3918       self._WaitUntilSync()
3919     except errors.OpExecError:
3920       # we ignore here errors, since if the device is standalone, it
3921       # won't be able to sync
3922       pass
3923     self._GoStandalone()
3924     self._GoReconnect(False)
3925     self._WaitUntilSync()
3926
3927     self.feedback_fn("* done")
3928
3929   def _RevertDiskStatus(self):
3930     """Try to revert the disk status after a failed migration.
3931
3932     """
3933     target_node = self.target_node
3934     try:
3935       self._EnsureSecondary(target_node)
3936       self._GoStandalone()
3937       self._GoReconnect(False)
3938       self._WaitUntilSync()
3939     except errors.OpExecError, err:
3940       self.LogWarning("Migration failed and I can't reconnect the"
3941                       " drives: error '%s'\n"
3942                       "Please look and recover the instance status" %
3943                       str(err))
3944
3945   def _AbortMigration(self):
3946     """Call the hypervisor code to abort a started migration.
3947
3948     """
3949     instance = self.instance
3950     target_node = self.target_node
3951     migration_info = self.migration_info
3952
3953     abort_result = self.rpc.call_finalize_migration(target_node,
3954                                                     instance,
3955                                                     migration_info,
3956                                                     False)
3957     abort_msg = abort_result.RemoteFailMsg()
3958     if abort_msg:
3959       logging.error("Aborting migration failed on target node %s: %s" %
3960                     (target_node, abort_msg))
3961       # Don't raise an exception here, as we stil have to try to revert the
3962       # disk status, even if this step failed.
3963
3964   def _ExecMigration(self):
3965     """Migrate an instance.
3966
3967     The migrate is done by:
3968       - change the disks into dual-master mode
3969       - wait until disks are fully synchronized again
3970       - migrate the instance
3971       - change disks on the new secondary node (the old primary) to secondary
3972       - wait until disks are fully synchronized
3973       - change disks into single-master mode
3974
3975     """
3976     instance = self.instance
3977     target_node = self.target_node
3978     source_node = self.source_node
3979
3980     self.feedback_fn("* checking disk consistency between source and target")
3981     for dev in instance.disks:
3982       if not _CheckDiskConsistency(self, dev, target_node, False):
3983         raise errors.OpExecError("Disk %s is degraded or not fully"
3984                                  " synchronized on target node,"
3985                                  " aborting migrate." % dev.iv_name)
3986
3987     # First get the migration information from the remote node
3988     result = self.rpc.call_migration_info(source_node, instance)
3989     msg = result.RemoteFailMsg()
3990     if msg:
3991       log_err = ("Failed fetching source migration information from %s: %s" %
3992                  (source_node, msg))
3993       logging.error(log_err)
3994       raise errors.OpExecError(log_err)
3995
3996     self.migration_info = migration_info = result.payload
3997
3998     # Then switch the disks to master/master mode
3999     self._EnsureSecondary(target_node)
4000     self._GoStandalone()
4001     self._GoReconnect(True)
4002     self._WaitUntilSync()
4003
4004     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4005     result = self.rpc.call_accept_instance(target_node,
4006                                            instance,
4007                                            migration_info,
4008                                            self.nodes_ip[target_node])
4009
4010     msg = result.RemoteFailMsg()
4011     if msg:
4012       logging.error("Instance pre-migration failed, trying to revert"
4013                     " disk status: %s", msg)
4014       self._AbortMigration()
4015       self._RevertDiskStatus()
4016       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4017                                (instance.name, msg))
4018
4019     self.feedback_fn("* migrating instance to %s" % target_node)
4020     time.sleep(10)
4021     result = self.rpc.call_instance_migrate(source_node, instance,
4022                                             self.nodes_ip[target_node],
4023                                             self.op.live)
4024     msg = result.RemoteFailMsg()
4025     if msg:
4026       logging.error("Instance migration failed, trying to revert"
4027                     " disk status: %s", msg)
4028       self._AbortMigration()
4029       self._RevertDiskStatus()
4030       raise errors.OpExecError("Could not migrate instance %s: %s" %
4031                                (instance.name, msg))
4032     time.sleep(10)
4033
4034     instance.primary_node = target_node
4035     # distribute new instance config to the other nodes
4036     self.cfg.Update(instance)
4037
4038     result = self.rpc.call_finalize_migration(target_node,
4039                                               instance,
4040                                               migration_info,
4041                                               True)
4042     msg = result.RemoteFailMsg()
4043     if msg:
4044       logging.error("Instance migration succeeded, but finalization failed:"
4045                     " %s" % msg)
4046       raise errors.OpExecError("Could not finalize instance migration: %s" %
4047                                msg)
4048
4049     self._EnsureSecondary(source_node)
4050     self._WaitUntilSync()
4051     self._GoStandalone()
4052     self._GoReconnect(False)
4053     self._WaitUntilSync()
4054
4055     self.feedback_fn("* done")
4056
4057   def Exec(self, feedback_fn):
4058     """Perform the migration.
4059
4060     """
4061     self.feedback_fn = feedback_fn
4062
4063     self.source_node = self.instance.primary_node
4064     self.target_node = self.instance.secondary_nodes[0]
4065     self.all_nodes = [self.source_node, self.target_node]
4066     self.nodes_ip = {
4067       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4068       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4069       }
4070     if self.op.cleanup:
4071       return self._ExecCleanup()
4072     else:
4073       return self._ExecMigration()
4074
4075
4076 def _CreateBlockDev(lu, node, instance, device, force_create,
4077                     info, force_open):
4078   """Create a tree of block devices on a given node.
4079
4080   If this device type has to be created on secondaries, create it and
4081   all its children.
4082
4083   If not, just recurse to children keeping the same 'force' value.
4084
4085   @param lu: the lu on whose behalf we execute
4086   @param node: the node on which to create the device
4087   @type instance: L{objects.Instance}
4088   @param instance: the instance which owns the device
4089   @type device: L{objects.Disk}
4090   @param device: the device to create
4091   @type force_create: boolean
4092   @param force_create: whether to force creation of this device; this
4093       will be change to True whenever we find a device which has
4094       CreateOnSecondary() attribute
4095   @param info: the extra 'metadata' we should attach to the device
4096       (this will be represented as a LVM tag)
4097   @type force_open: boolean
4098   @param force_open: this parameter will be passes to the
4099       L{backend.BlockdevCreate} function where it specifies
4100       whether we run on primary or not, and it affects both
4101       the child assembly and the device own Open() execution
4102
4103   """
4104   if device.CreateOnSecondary():
4105     force_create = True
4106
4107   if device.children:
4108     for child in device.children:
4109       _CreateBlockDev(lu, node, instance, child, force_create,
4110                       info, force_open)
4111
4112   if not force_create:
4113     return
4114
4115   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4116
4117
4118 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4119   """Create a single block device on a given node.
4120
4121   This will not recurse over children of the device, so they must be
4122   created in advance.
4123
4124   @param lu: the lu on whose behalf we execute
4125   @param node: the node on which to create the device
4126   @type instance: L{objects.Instance}
4127   @param instance: the instance which owns the device
4128   @type device: L{objects.Disk}
4129   @param device: the device to create
4130   @param info: the extra 'metadata' we should attach to the device
4131       (this will be represented as a LVM tag)
4132   @type force_open: boolean
4133   @param force_open: this parameter will be passes to the
4134       L{backend.BlockdevCreate} function where it specifies
4135       whether we run on primary or not, and it affects both
4136       the child assembly and the device own Open() execution
4137
4138   """
4139   lu.cfg.SetDiskID(device, node)
4140   result = lu.rpc.call_blockdev_create(node, device, device.size,
4141                                        instance.name, force_open, info)
4142   msg = result.RemoteFailMsg()
4143   if msg:
4144     raise errors.OpExecError("Can't create block device %s on"
4145                              " node %s for instance %s: %s" %
4146                              (device, node, instance.name, msg))
4147   if device.physical_id is None:
4148     device.physical_id = result.payload
4149
4150
4151 def _GenerateUniqueNames(lu, exts):
4152   """Generate a suitable LV name.
4153
4154   This will generate a logical volume name for the given instance.
4155
4156   """
4157   results = []
4158   for val in exts:
4159     new_id = lu.cfg.GenerateUniqueID()
4160     results.append("%s%s" % (new_id, val))
4161   return results
4162
4163
4164 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4165                          p_minor, s_minor):
4166   """Generate a drbd8 device complete with its children.
4167
4168   """
4169   port = lu.cfg.AllocatePort()
4170   vgname = lu.cfg.GetVGName()
4171   shared_secret = lu.cfg.GenerateDRBDSecret()
4172   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4173                           logical_id=(vgname, names[0]))
4174   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4175                           logical_id=(vgname, names[1]))
4176   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4177                           logical_id=(primary, secondary, port,
4178                                       p_minor, s_minor,
4179                                       shared_secret),
4180                           children=[dev_data, dev_meta],
4181                           iv_name=iv_name)
4182   return drbd_dev
4183
4184
4185 def _GenerateDiskTemplate(lu, template_name,
4186                           instance_name, primary_node,
4187                           secondary_nodes, disk_info,
4188                           file_storage_dir, file_driver,
4189                           base_index):
4190   """Generate the entire disk layout for a given template type.
4191
4192   """
4193   #TODO: compute space requirements
4194
4195   vgname = lu.cfg.GetVGName()
4196   disk_count = len(disk_info)
4197   disks = []
4198   if template_name == constants.DT_DISKLESS:
4199     pass
4200   elif template_name == constants.DT_PLAIN:
4201     if len(secondary_nodes) != 0:
4202       raise errors.ProgrammerError("Wrong template configuration")
4203
4204     names = _GenerateUniqueNames(lu, [".disk%d" % i
4205                                       for i in range(disk_count)])
4206     for idx, disk in enumerate(disk_info):
4207       disk_index = idx + base_index
4208       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4209                               logical_id=(vgname, names[idx]),
4210                               iv_name="disk/%d" % disk_index,
4211                               mode=disk["mode"])
4212       disks.append(disk_dev)
4213   elif template_name == constants.DT_DRBD8:
4214     if len(secondary_nodes) != 1:
4215       raise errors.ProgrammerError("Wrong template configuration")
4216     remote_node = secondary_nodes[0]
4217     minors = lu.cfg.AllocateDRBDMinor(
4218       [primary_node, remote_node] * len(disk_info), instance_name)
4219
4220     names = []
4221     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4222                                                for i in range(disk_count)]):
4223       names.append(lv_prefix + "_data")
4224       names.append(lv_prefix + "_meta")
4225     for idx, disk in enumerate(disk_info):
4226       disk_index = idx + base_index
4227       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4228                                       disk["size"], names[idx*2:idx*2+2],
4229                                       "disk/%d" % disk_index,
4230                                       minors[idx*2], minors[idx*2+1])
4231       disk_dev.mode = disk["mode"]
4232       disks.append(disk_dev)
4233   elif template_name == constants.DT_FILE:
4234     if len(secondary_nodes) != 0:
4235       raise errors.ProgrammerError("Wrong template configuration")
4236
4237     for idx, disk in enumerate(disk_info):
4238       disk_index = idx + base_index
4239       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4240                               iv_name="disk/%d" % disk_index,
4241                               logical_id=(file_driver,
4242                                           "%s/disk%d" % (file_storage_dir,
4243                                                          disk_index)),
4244                               mode=disk["mode"])
4245       disks.append(disk_dev)
4246   else:
4247     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4248   return disks
4249
4250
4251 def _GetInstanceInfoText(instance):
4252   """Compute that text that should be added to the disk's metadata.
4253
4254   """
4255   return "originstname+%s" % instance.name
4256
4257
4258 def _CreateDisks(lu, instance):
4259   """Create all disks for an instance.
4260
4261   This abstracts away some work from AddInstance.
4262
4263   @type lu: L{LogicalUnit}
4264   @param lu: the logical unit on whose behalf we execute
4265   @type instance: L{objects.Instance}
4266   @param instance: the instance whose disks we should create
4267   @rtype: boolean
4268   @return: the success of the creation
4269
4270   """
4271   info = _GetInstanceInfoText(instance)
4272   pnode = instance.primary_node
4273
4274   if instance.disk_template == constants.DT_FILE:
4275     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4276     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4277
4278     if result.failed or not result.data:
4279       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4280
4281     if not result.data[0]:
4282       raise errors.OpExecError("Failed to create directory '%s'" %
4283                                file_storage_dir)
4284
4285   # Note: this needs to be kept in sync with adding of disks in
4286   # LUSetInstanceParams
4287   for device in instance.disks:
4288     logging.info("Creating volume %s for instance %s",
4289                  device.iv_name, instance.name)
4290     #HARDCODE
4291     for node in instance.all_nodes:
4292       f_create = node == pnode
4293       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4294
4295
4296 def _RemoveDisks(lu, instance):
4297   """Remove all disks for an instance.
4298
4299   This abstracts away some work from `AddInstance()` and
4300   `RemoveInstance()`. Note that in case some of the devices couldn't
4301   be removed, the removal will continue with the other ones (compare
4302   with `_CreateDisks()`).
4303
4304   @type lu: L{LogicalUnit}
4305   @param lu: the logical unit on whose behalf we execute
4306   @type instance: L{objects.Instance}
4307   @param instance: the instance whose disks we should remove
4308   @rtype: boolean
4309   @return: the success of the removal
4310
4311   """
4312   logging.info("Removing block devices for instance %s", instance.name)
4313
4314   all_result = True
4315   for device in instance.disks:
4316     for node, disk in device.ComputeNodeTree(instance.primary_node):
4317       lu.cfg.SetDiskID(disk, node)
4318       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4319       if msg:
4320         lu.LogWarning("Could not remove block device %s on node %s,"
4321                       " continuing anyway: %s", device.iv_name, node, msg)
4322         all_result = False
4323
4324   if instance.disk_template == constants.DT_FILE:
4325     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4326     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4327                                                  file_storage_dir)
4328     if result.failed or not result.data:
4329       logging.error("Could not remove directory '%s'", file_storage_dir)
4330       all_result = False
4331
4332   return all_result
4333
4334
4335 def _ComputeDiskSize(disk_template, disks):
4336   """Compute disk size requirements in the volume group
4337
4338   """
4339   # Required free disk space as a function of disk and swap space
4340   req_size_dict = {
4341     constants.DT_DISKLESS: None,
4342     constants.DT_PLAIN: sum(d["size"] for d in disks),
4343     # 128 MB are added for drbd metadata for each disk
4344     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4345     constants.DT_FILE: None,
4346   }
4347
4348   if disk_template not in req_size_dict:
4349     raise errors.ProgrammerError("Disk template '%s' size requirement"
4350                                  " is unknown" %  disk_template)
4351
4352   return req_size_dict[disk_template]
4353
4354
4355 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4356   """Hypervisor parameter validation.
4357
4358   This function abstract the hypervisor parameter validation to be
4359   used in both instance create and instance modify.
4360
4361   @type lu: L{LogicalUnit}
4362   @param lu: the logical unit for which we check
4363   @type nodenames: list
4364   @param nodenames: the list of nodes on which we should check
4365   @type hvname: string
4366   @param hvname: the name of the hypervisor we should use
4367   @type hvparams: dict
4368   @param hvparams: the parameters which we need to check
4369   @raise errors.OpPrereqError: if the parameters are not valid
4370
4371   """
4372   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4373                                                   hvname,
4374                                                   hvparams)
4375   for node in nodenames:
4376     info = hvinfo[node]
4377     if info.offline:
4378       continue
4379     msg = info.RemoteFailMsg()
4380     if msg:
4381       raise errors.OpPrereqError("Hypervisor parameter validation"
4382                                  " failed on node %s: %s" % (node, msg))
4383
4384
4385 class LUCreateInstance(LogicalUnit):
4386   """Create an instance.
4387
4388   """
4389   HPATH = "instance-add"
4390   HTYPE = constants.HTYPE_INSTANCE
4391   _OP_REQP = ["instance_name", "disks", "disk_template",
4392               "mode", "start",
4393               "wait_for_sync", "ip_check", "nics",
4394               "hvparams", "beparams"]
4395   REQ_BGL = False
4396
4397   def _ExpandNode(self, node):
4398     """Expands and checks one node name.
4399
4400     """
4401     node_full = self.cfg.ExpandNodeName(node)
4402     if node_full is None:
4403       raise errors.OpPrereqError("Unknown node %s" % node)
4404     return node_full
4405
4406   def ExpandNames(self):
4407     """ExpandNames for CreateInstance.
4408
4409     Figure out the right locks for instance creation.
4410
4411     """
4412     self.needed_locks = {}
4413
4414     # set optional parameters to none if they don't exist
4415     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4416       if not hasattr(self.op, attr):
4417         setattr(self.op, attr, None)
4418
4419     # cheap checks, mostly valid constants given
4420
4421     # verify creation mode
4422     if self.op.mode not in (constants.INSTANCE_CREATE,
4423                             constants.INSTANCE_IMPORT):
4424       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4425                                  self.op.mode)
4426
4427     # disk template and mirror node verification
4428     if self.op.disk_template not in constants.DISK_TEMPLATES:
4429       raise errors.OpPrereqError("Invalid disk template name")
4430
4431     if self.op.hypervisor is None:
4432       self.op.hypervisor = self.cfg.GetHypervisorType()
4433
4434     cluster = self.cfg.GetClusterInfo()
4435     enabled_hvs = cluster.enabled_hypervisors
4436     if self.op.hypervisor not in enabled_hvs:
4437       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4438                                  " cluster (%s)" % (self.op.hypervisor,
4439                                   ",".join(enabled_hvs)))
4440
4441     # check hypervisor parameter syntax (locally)
4442     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4443     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4444                                   self.op.hvparams)
4445     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4446     hv_type.CheckParameterSyntax(filled_hvp)
4447
4448     # fill and remember the beparams dict
4449     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4450     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4451                                     self.op.beparams)
4452
4453     #### instance parameters check
4454
4455     # instance name verification
4456     hostname1 = utils.HostInfo(self.op.instance_name)
4457     self.op.instance_name = instance_name = hostname1.name
4458
4459     # this is just a preventive check, but someone might still add this
4460     # instance in the meantime, and creation will fail at lock-add time
4461     if instance_name in self.cfg.GetInstanceList():
4462       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4463                                  instance_name)
4464
4465     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4466
4467     # NIC buildup
4468     self.nics = []
4469     for idx, nic in enumerate(self.op.nics):
4470       nic_mode_req = nic.get("mode", None)
4471       nic_mode = nic_mode_req
4472       if nic_mode is None:
4473         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4474
4475       # in routed mode, for the first nic, the default ip is 'auto'
4476       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4477         default_ip_mode = constants.VALUE_AUTO
4478       else:
4479         default_ip_mode = constants.VALUE_NONE
4480
4481       # ip validity checks
4482       ip = nic.get("ip", default_ip_mode)
4483       if ip is None or ip.lower() == constants.VALUE_NONE:
4484         nic_ip = None
4485       elif ip.lower() == constants.VALUE_AUTO:
4486         nic_ip = hostname1.ip
4487       else:
4488         if not utils.IsValidIP(ip):
4489           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4490                                      " like a valid IP" % ip)
4491         nic_ip = ip
4492
4493       # TODO: check the ip for uniqueness !!
4494       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4495         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4496
4497       # MAC address verification
4498       mac = nic.get("mac", constants.VALUE_AUTO)
4499       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4500         if not utils.IsValidMac(mac.lower()):
4501           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4502                                      mac)
4503       # bridge verification
4504       bridge = nic.get("bridge", None)
4505       link = nic.get("link", None)
4506       if bridge and link:
4507         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4508       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4509         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4510       elif bridge:
4511         link = bridge
4512
4513       nicparams = {}
4514       if nic_mode_req:
4515         nicparams[constants.NIC_MODE] = nic_mode_req
4516       if link:
4517         nicparams[constants.NIC_LINK] = link
4518
4519       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4520                                       nicparams)
4521       objects.NIC.CheckParameterSyntax(check_params)
4522       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4523
4524     # disk checks/pre-build
4525     self.disks = []
4526     for disk in self.op.disks:
4527       mode = disk.get("mode", constants.DISK_RDWR)
4528       if mode not in constants.DISK_ACCESS_SET:
4529         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4530                                    mode)
4531       size = disk.get("size", None)
4532       if size is None:
4533         raise errors.OpPrereqError("Missing disk size")
4534       try:
4535         size = int(size)
4536       except ValueError:
4537         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4538       self.disks.append({"size": size, "mode": mode})
4539
4540     # used in CheckPrereq for ip ping check
4541     self.check_ip = hostname1.ip
4542
4543     # file storage checks
4544     if (self.op.file_driver and
4545         not self.op.file_driver in constants.FILE_DRIVER):
4546       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4547                                  self.op.file_driver)
4548
4549     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4550       raise errors.OpPrereqError("File storage directory path not absolute")
4551
4552     ### Node/iallocator related checks
4553     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4554       raise errors.OpPrereqError("One and only one of iallocator and primary"
4555                                  " node must be given")
4556
4557     if self.op.iallocator:
4558       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4559     else:
4560       self.op.pnode = self._ExpandNode(self.op.pnode)
4561       nodelist = [self.op.pnode]
4562       if self.op.snode is not None:
4563         self.op.snode = self._ExpandNode(self.op.snode)
4564         nodelist.append(self.op.snode)
4565       self.needed_locks[locking.LEVEL_NODE] = nodelist
4566
4567     # in case of import lock the source node too
4568     if self.op.mode == constants.INSTANCE_IMPORT:
4569       src_node = getattr(self.op, "src_node", None)
4570       src_path = getattr(self.op, "src_path", None)
4571
4572       if src_path is None:
4573         self.op.src_path = src_path = self.op.instance_name
4574
4575       if src_node is None:
4576         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4577         self.op.src_node = None
4578         if os.path.isabs(src_path):
4579           raise errors.OpPrereqError("Importing an instance from an absolute"
4580                                      " path requires a source node option.")
4581       else:
4582         self.op.src_node = src_node = self._ExpandNode(src_node)
4583         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4584           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4585         if not os.path.isabs(src_path):
4586           self.op.src_path = src_path = \
4587             os.path.join(constants.EXPORT_DIR, src_path)
4588
4589     else: # INSTANCE_CREATE
4590       if getattr(self.op, "os_type", None) is None:
4591         raise errors.OpPrereqError("No guest OS specified")
4592
4593   def _RunAllocator(self):
4594     """Run the allocator based on input opcode.
4595
4596     """
4597     nics = [n.ToDict() for n in self.nics]
4598     ial = IAllocator(self,
4599                      mode=constants.IALLOCATOR_MODE_ALLOC,
4600                      name=self.op.instance_name,
4601                      disk_template=self.op.disk_template,
4602                      tags=[],
4603                      os=self.op.os_type,
4604                      vcpus=self.be_full[constants.BE_VCPUS],
4605                      mem_size=self.be_full[constants.BE_MEMORY],
4606                      disks=self.disks,
4607                      nics=nics,
4608                      hypervisor=self.op.hypervisor,
4609                      )
4610
4611     ial.Run(self.op.iallocator)
4612
4613     if not ial.success:
4614       raise errors.OpPrereqError("Can't compute nodes using"
4615                                  " iallocator '%s': %s" % (self.op.iallocator,
4616                                                            ial.info))
4617     if len(ial.nodes) != ial.required_nodes:
4618       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4619                                  " of nodes (%s), required %s" %
4620                                  (self.op.iallocator, len(ial.nodes),
4621                                   ial.required_nodes))
4622     self.op.pnode = ial.nodes[0]
4623     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4624                  self.op.instance_name, self.op.iallocator,
4625                  ", ".join(ial.nodes))
4626     if ial.required_nodes == 2:
4627       self.op.snode = ial.nodes[1]
4628
4629   def BuildHooksEnv(self):
4630     """Build hooks env.
4631
4632     This runs on master, primary and secondary nodes of the instance.
4633
4634     """
4635     env = {
4636       "ADD_MODE": self.op.mode,
4637       }
4638     if self.op.mode == constants.INSTANCE_IMPORT:
4639       env["SRC_NODE"] = self.op.src_node
4640       env["SRC_PATH"] = self.op.src_path
4641       env["SRC_IMAGES"] = self.src_images
4642
4643     env.update(_BuildInstanceHookEnv(
4644       name=self.op.instance_name,
4645       primary_node=self.op.pnode,
4646       secondary_nodes=self.secondaries,
4647       status=self.op.start,
4648       os_type=self.op.os_type,
4649       memory=self.be_full[constants.BE_MEMORY],
4650       vcpus=self.be_full[constants.BE_VCPUS],
4651       nics=_PreBuildNICHooksList(self, self.nics),
4652       disk_template=self.op.disk_template,
4653       disks=[(d["size"], d["mode"]) for d in self.disks],
4654     ))
4655
4656     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4657           self.secondaries)
4658     return env, nl, nl
4659
4660
4661   def CheckPrereq(self):
4662     """Check prerequisites.
4663
4664     """
4665     if (not self.cfg.GetVGName() and
4666         self.op.disk_template not in constants.DTS_NOT_LVM):
4667       raise errors.OpPrereqError("Cluster does not support lvm-based"
4668                                  " instances")
4669
4670     if self.op.mode == constants.INSTANCE_IMPORT:
4671       src_node = self.op.src_node
4672       src_path = self.op.src_path
4673
4674       if src_node is None:
4675         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4676         exp_list = self.rpc.call_export_list(locked_nodes)
4677         found = False
4678         for node in exp_list:
4679           if exp_list[node].RemoteFailMsg():
4680             continue
4681           if src_path in exp_list[node].payload:
4682             found = True
4683             self.op.src_node = src_node = node
4684             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4685                                                        src_path)
4686             break
4687         if not found:
4688           raise errors.OpPrereqError("No export found for relative path %s" %
4689                                       src_path)
4690
4691       _CheckNodeOnline(self, src_node)
4692       result = self.rpc.call_export_info(src_node, src_path)
4693       msg = result.RemoteFailMsg()
4694       if msg:
4695         raise errors.OpPrereqError("No export or invalid export found in"
4696                                    " dir %s: %s" % (src_path, msg))
4697
4698       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4699       if not export_info.has_section(constants.INISECT_EXP):
4700         raise errors.ProgrammerError("Corrupted export config")
4701
4702       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4703       if (int(ei_version) != constants.EXPORT_VERSION):
4704         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4705                                    (ei_version, constants.EXPORT_VERSION))
4706
4707       # Check that the new instance doesn't have less disks than the export
4708       instance_disks = len(self.disks)
4709       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4710       if instance_disks < export_disks:
4711         raise errors.OpPrereqError("Not enough disks to import."
4712                                    " (instance: %d, export: %d)" %
4713                                    (instance_disks, export_disks))
4714
4715       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4716       disk_images = []
4717       for idx in range(export_disks):
4718         option = 'disk%d_dump' % idx
4719         if export_info.has_option(constants.INISECT_INS, option):
4720           # FIXME: are the old os-es, disk sizes, etc. useful?
4721           export_name = export_info.get(constants.INISECT_INS, option)
4722           image = os.path.join(src_path, export_name)
4723           disk_images.append(image)
4724         else:
4725           disk_images.append(False)
4726
4727       self.src_images = disk_images
4728
4729       old_name = export_info.get(constants.INISECT_INS, 'name')
4730       # FIXME: int() here could throw a ValueError on broken exports
4731       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4732       if self.op.instance_name == old_name:
4733         for idx, nic in enumerate(self.nics):
4734           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4735             nic_mac_ini = 'nic%d_mac' % idx
4736             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4737
4738     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4739     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4740     if self.op.start and not self.op.ip_check:
4741       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4742                                  " adding an instance in start mode")
4743
4744     if self.op.ip_check:
4745       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4746         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4747                                    (self.check_ip, self.op.instance_name))
4748
4749     #### mac address generation
4750     # By generating here the mac address both the allocator and the hooks get
4751     # the real final mac address rather than the 'auto' or 'generate' value.
4752     # There is a race condition between the generation and the instance object
4753     # creation, which means that we know the mac is valid now, but we're not
4754     # sure it will be when we actually add the instance. If things go bad
4755     # adding the instance will abort because of a duplicate mac, and the
4756     # creation job will fail.
4757     for nic in self.nics:
4758       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4759         nic.mac = self.cfg.GenerateMAC()
4760
4761     #### allocator run
4762
4763     if self.op.iallocator is not None:
4764       self._RunAllocator()
4765
4766     #### node related checks
4767
4768     # check primary node
4769     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4770     assert self.pnode is not None, \
4771       "Cannot retrieve locked node %s" % self.op.pnode
4772     if pnode.offline:
4773       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4774                                  pnode.name)
4775     if pnode.drained:
4776       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4777                                  pnode.name)
4778
4779     self.secondaries = []
4780
4781     # mirror node verification
4782     if self.op.disk_template in constants.DTS_NET_MIRROR:
4783       if self.op.snode is None:
4784         raise errors.OpPrereqError("The networked disk templates need"
4785                                    " a mirror node")
4786       if self.op.snode == pnode.name:
4787         raise errors.OpPrereqError("The secondary node cannot be"
4788                                    " the primary node.")
4789       _CheckNodeOnline(self, self.op.snode)
4790       _CheckNodeNotDrained(self, self.op.snode)
4791       self.secondaries.append(self.op.snode)
4792
4793     nodenames = [pnode.name] + self.secondaries
4794
4795     req_size = _ComputeDiskSize(self.op.disk_template,
4796                                 self.disks)
4797
4798     # Check lv size requirements
4799     if req_size is not None:
4800       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4801                                          self.op.hypervisor)
4802       for node in nodenames:
4803         info = nodeinfo[node]
4804         info.Raise()
4805         info = info.data
4806         if not info:
4807           raise errors.OpPrereqError("Cannot get current information"
4808                                      " from node '%s'" % node)
4809         vg_free = info.get('vg_free', None)
4810         if not isinstance(vg_free, int):
4811           raise errors.OpPrereqError("Can't compute free disk space on"
4812                                      " node %s" % node)
4813         if req_size > info['vg_free']:
4814           raise errors.OpPrereqError("Not enough disk space on target node %s."
4815                                      " %d MB available, %d MB required" %
4816                                      (node, info['vg_free'], req_size))
4817
4818     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4819
4820     # os verification
4821     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4822     result.Raise()
4823     if not isinstance(result.data, objects.OS):
4824       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4825                                  " primary node"  % self.op.os_type)
4826
4827     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4828
4829     # memory check on primary node
4830     if self.op.start:
4831       _CheckNodeFreeMemory(self, self.pnode.name,
4832                            "creating instance %s" % self.op.instance_name,
4833                            self.be_full[constants.BE_MEMORY],
4834                            self.op.hypervisor)
4835
4836   def Exec(self, feedback_fn):
4837     """Create and add the instance to the cluster.
4838
4839     """
4840     instance = self.op.instance_name
4841     pnode_name = self.pnode.name
4842
4843     ht_kind = self.op.hypervisor
4844     if ht_kind in constants.HTS_REQ_PORT:
4845       network_port = self.cfg.AllocatePort()
4846     else:
4847       network_port = None
4848
4849     ##if self.op.vnc_bind_address is None:
4850     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4851
4852     # this is needed because os.path.join does not accept None arguments
4853     if self.op.file_storage_dir is None:
4854       string_file_storage_dir = ""
4855     else:
4856       string_file_storage_dir = self.op.file_storage_dir
4857
4858     # build the full file storage dir path
4859     file_storage_dir = os.path.normpath(os.path.join(
4860                                         self.cfg.GetFileStorageDir(),
4861                                         string_file_storage_dir, instance))
4862
4863
4864     disks = _GenerateDiskTemplate(self,
4865                                   self.op.disk_template,
4866                                   instance, pnode_name,
4867                                   self.secondaries,
4868                                   self.disks,
4869                                   file_storage_dir,
4870                                   self.op.file_driver,
4871                                   0)
4872
4873     iobj = objects.Instance(name=instance, os=self.op.os_type,
4874                             primary_node=pnode_name,
4875                             nics=self.nics, disks=disks,
4876                             disk_template=self.op.disk_template,
4877                             admin_up=False,
4878                             network_port=network_port,
4879                             beparams=self.op.beparams,
4880                             hvparams=self.op.hvparams,
4881                             hypervisor=self.op.hypervisor,
4882                             )
4883
4884     feedback_fn("* creating instance disks...")
4885     try:
4886       _CreateDisks(self, iobj)
4887     except errors.OpExecError:
4888       self.LogWarning("Device creation failed, reverting...")
4889       try:
4890         _RemoveDisks(self, iobj)
4891       finally:
4892         self.cfg.ReleaseDRBDMinors(instance)
4893         raise
4894
4895     feedback_fn("adding instance %s to cluster config" % instance)
4896
4897     self.cfg.AddInstance(iobj)
4898     # Declare that we don't want to remove the instance lock anymore, as we've
4899     # added the instance to the config
4900     del self.remove_locks[locking.LEVEL_INSTANCE]
4901     # Unlock all the nodes
4902     if self.op.mode == constants.INSTANCE_IMPORT:
4903       nodes_keep = [self.op.src_node]
4904       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4905                        if node != self.op.src_node]
4906       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4907       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4908     else:
4909       self.context.glm.release(locking.LEVEL_NODE)
4910       del self.acquired_locks[locking.LEVEL_NODE]
4911
4912     if self.op.wait_for_sync:
4913       disk_abort = not _WaitForSync(self, iobj)
4914     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4915       # make sure the disks are not degraded (still sync-ing is ok)
4916       time.sleep(15)
4917       feedback_fn("* checking mirrors status")
4918       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4919     else:
4920       disk_abort = False
4921
4922     if disk_abort:
4923       _RemoveDisks(self, iobj)
4924       self.cfg.RemoveInstance(iobj.name)
4925       # Make sure the instance lock gets removed
4926       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4927       raise errors.OpExecError("There are some degraded disks for"
4928                                " this instance")
4929
4930     feedback_fn("creating os for instance %s on node %s" %
4931                 (instance, pnode_name))
4932
4933     if iobj.disk_template != constants.DT_DISKLESS:
4934       if self.op.mode == constants.INSTANCE_CREATE:
4935         feedback_fn("* running the instance OS create scripts...")
4936         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4937         msg = result.RemoteFailMsg()
4938         if msg:
4939           raise errors.OpExecError("Could not add os for instance %s"
4940                                    " on node %s: %s" %
4941                                    (instance, pnode_name, msg))
4942
4943       elif self.op.mode == constants.INSTANCE_IMPORT:
4944         feedback_fn("* running the instance OS import scripts...")
4945         src_node = self.op.src_node
4946         src_images = self.src_images
4947         cluster_name = self.cfg.GetClusterName()
4948         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4949                                                          src_node, src_images,
4950                                                          cluster_name)
4951         import_result.Raise()
4952         for idx, result in enumerate(import_result.data):
4953           if not result:
4954             self.LogWarning("Could not import the image %s for instance"
4955                             " %s, disk %d, on node %s" %
4956                             (src_images[idx], instance, idx, pnode_name))
4957       else:
4958         # also checked in the prereq part
4959         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4960                                      % self.op.mode)
4961
4962     if self.op.start:
4963       iobj.admin_up = True
4964       self.cfg.Update(iobj)
4965       logging.info("Starting instance %s on node %s", instance, pnode_name)
4966       feedback_fn("* starting instance...")
4967       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4968       msg = result.RemoteFailMsg()
4969       if msg:
4970         raise errors.OpExecError("Could not start instance: %s" % msg)
4971
4972
4973 class LUConnectConsole(NoHooksLU):
4974   """Connect to an instance's console.
4975
4976   This is somewhat special in that it returns the command line that
4977   you need to run on the master node in order to connect to the
4978   console.
4979
4980   """
4981   _OP_REQP = ["instance_name"]
4982   REQ_BGL = False
4983
4984   def ExpandNames(self):
4985     self._ExpandAndLockInstance()
4986
4987   def CheckPrereq(self):
4988     """Check prerequisites.
4989
4990     This checks that the instance is in the cluster.
4991
4992     """
4993     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
4994     assert self.instance is not None, \
4995       "Cannot retrieve locked instance %s" % self.op.instance_name
4996     _CheckNodeOnline(self, self.instance.primary_node)
4997
4998   def Exec(self, feedback_fn):
4999     """Connect to the console of an instance
5000
5001     """
5002     instance = self.instance
5003     node = instance.primary_node
5004
5005     node_insts = self.rpc.call_instance_list([node],
5006                                              [instance.hypervisor])[node]
5007     node_insts.Raise()
5008
5009     if instance.name not in node_insts.data:
5010       raise errors.OpExecError("Instance %s is not running." % instance.name)
5011
5012     logging.debug("Connecting to console of %s on %s", instance.name, node)
5013
5014     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5015     cluster = self.cfg.GetClusterInfo()
5016     # beparams and hvparams are passed separately, to avoid editing the
5017     # instance and then saving the defaults in the instance itself.
5018     hvparams = cluster.FillHV(instance)
5019     beparams = cluster.FillBE(instance)
5020     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5021
5022     # build ssh cmdline
5023     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5024
5025
5026 class LUReplaceDisks(LogicalUnit):
5027   """Replace the disks of an instance.
5028
5029   """
5030   HPATH = "mirrors-replace"
5031   HTYPE = constants.HTYPE_INSTANCE
5032   _OP_REQP = ["instance_name", "mode", "disks"]
5033   REQ_BGL = False
5034
5035   def CheckArguments(self):
5036     if not hasattr(self.op, "remote_node"):
5037       self.op.remote_node = None
5038     if not hasattr(self.op, "iallocator"):
5039       self.op.iallocator = None
5040
5041     # check for valid parameter combination
5042     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5043     if self.op.mode == constants.REPLACE_DISK_CHG:
5044       if cnt == 2:
5045         raise errors.OpPrereqError("When changing the secondary either an"
5046                                    " iallocator script must be used or the"
5047                                    " new node given")
5048       elif cnt == 0:
5049         raise errors.OpPrereqError("Give either the iallocator or the new"
5050                                    " secondary, not both")
5051     else: # not replacing the secondary
5052       if cnt != 2:
5053         raise errors.OpPrereqError("The iallocator and new node options can"
5054                                    " be used only when changing the"
5055                                    " secondary node")
5056
5057   def ExpandNames(self):
5058     self._ExpandAndLockInstance()
5059
5060     if self.op.iallocator is not None:
5061       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5062     elif self.op.remote_node is not None:
5063       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5064       if remote_node is None:
5065         raise errors.OpPrereqError("Node '%s' not known" %
5066                                    self.op.remote_node)
5067       self.op.remote_node = remote_node
5068       # Warning: do not remove the locking of the new secondary here
5069       # unless DRBD8.AddChildren is changed to work in parallel;
5070       # currently it doesn't since parallel invocations of
5071       # FindUnusedMinor will conflict
5072       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5073       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5074     else:
5075       self.needed_locks[locking.LEVEL_NODE] = []
5076       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5077
5078   def DeclareLocks(self, level):
5079     # If we're not already locking all nodes in the set we have to declare the
5080     # instance's primary/secondary nodes.
5081     if (level == locking.LEVEL_NODE and
5082         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5083       self._LockInstancesNodes()
5084
5085   def _RunAllocator(self):
5086     """Compute a new secondary node using an IAllocator.
5087
5088     """
5089     ial = IAllocator(self,
5090                      mode=constants.IALLOCATOR_MODE_RELOC,
5091                      name=self.op.instance_name,
5092                      relocate_from=[self.sec_node])
5093
5094     ial.Run(self.op.iallocator)
5095
5096     if not ial.success:
5097       raise errors.OpPrereqError("Can't compute nodes using"
5098                                  " iallocator '%s': %s" % (self.op.iallocator,
5099                                                            ial.info))
5100     if len(ial.nodes) != ial.required_nodes:
5101       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5102                                  " of nodes (%s), required %s" %
5103                                  (len(ial.nodes), ial.required_nodes))
5104     self.op.remote_node = ial.nodes[0]
5105     self.LogInfo("Selected new secondary for the instance: %s",
5106                  self.op.remote_node)
5107
5108   def BuildHooksEnv(self):
5109     """Build hooks env.
5110
5111     This runs on the master, the primary and all the secondaries.
5112
5113     """
5114     env = {
5115       "MODE": self.op.mode,
5116       "NEW_SECONDARY": self.op.remote_node,
5117       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5118       }
5119     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5120     nl = [
5121       self.cfg.GetMasterNode(),
5122       self.instance.primary_node,
5123       ]
5124     if self.op.remote_node is not None:
5125       nl.append(self.op.remote_node)
5126     return env, nl, nl
5127
5128   def CheckPrereq(self):
5129     """Check prerequisites.
5130
5131     This checks that the instance is in the cluster.
5132
5133     """
5134     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5135     assert instance is not None, \
5136       "Cannot retrieve locked instance %s" % self.op.instance_name
5137     self.instance = instance
5138
5139     if instance.disk_template != constants.DT_DRBD8:
5140       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5141                                  " instances")
5142
5143     if len(instance.secondary_nodes) != 1:
5144       raise errors.OpPrereqError("The instance has a strange layout,"
5145                                  " expected one secondary but found %d" %
5146                                  len(instance.secondary_nodes))
5147
5148     self.sec_node = instance.secondary_nodes[0]
5149
5150     if self.op.iallocator is not None:
5151       self._RunAllocator()
5152
5153     remote_node = self.op.remote_node
5154     if remote_node is not None:
5155       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5156       assert self.remote_node_info is not None, \
5157         "Cannot retrieve locked node %s" % remote_node
5158     else:
5159       self.remote_node_info = None
5160     if remote_node == instance.primary_node:
5161       raise errors.OpPrereqError("The specified node is the primary node of"
5162                                  " the instance.")
5163     elif remote_node == self.sec_node:
5164       raise errors.OpPrereqError("The specified node is already the"
5165                                  " secondary node of the instance.")
5166
5167     if self.op.mode == constants.REPLACE_DISK_PRI:
5168       n1 = self.tgt_node = instance.primary_node
5169       n2 = self.oth_node = self.sec_node
5170     elif self.op.mode == constants.REPLACE_DISK_SEC:
5171       n1 = self.tgt_node = self.sec_node
5172       n2 = self.oth_node = instance.primary_node
5173     elif self.op.mode == constants.REPLACE_DISK_CHG:
5174       n1 = self.new_node = remote_node
5175       n2 = self.oth_node = instance.primary_node
5176       self.tgt_node = self.sec_node
5177       _CheckNodeNotDrained(self, remote_node)
5178     else:
5179       raise errors.ProgrammerError("Unhandled disk replace mode")
5180
5181     _CheckNodeOnline(self, n1)
5182     _CheckNodeOnline(self, n2)
5183
5184     if not self.op.disks:
5185       self.op.disks = range(len(instance.disks))
5186
5187     for disk_idx in self.op.disks:
5188       instance.FindDisk(disk_idx)
5189
5190   def _ExecD8DiskOnly(self, feedback_fn):
5191     """Replace a disk on the primary or secondary for dbrd8.
5192
5193     The algorithm for replace is quite complicated:
5194
5195       1. for each disk to be replaced:
5196
5197         1. create new LVs on the target node with unique names
5198         1. detach old LVs from the drbd device
5199         1. rename old LVs to name_replaced.<time_t>
5200         1. rename new LVs to old LVs
5201         1. attach the new LVs (with the old names now) to the drbd device
5202
5203       1. wait for sync across all devices
5204
5205       1. for each modified disk:
5206
5207         1. remove old LVs (which have the name name_replaces.<time_t>)
5208
5209     Failures are not very well handled.
5210
5211     """
5212     steps_total = 6
5213     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5214     instance = self.instance
5215     iv_names = {}
5216     vgname = self.cfg.GetVGName()
5217     # start of work
5218     cfg = self.cfg
5219     tgt_node = self.tgt_node
5220     oth_node = self.oth_node
5221
5222     # Step: check device activation
5223     self.proc.LogStep(1, steps_total, "check device existence")
5224     info("checking volume groups")
5225     my_vg = cfg.GetVGName()
5226     results = self.rpc.call_vg_list([oth_node, tgt_node])
5227     if not results:
5228       raise errors.OpExecError("Can't list volume groups on the nodes")
5229     for node in oth_node, tgt_node:
5230       res = results[node]
5231       if res.failed or not res.data or my_vg not in res.data:
5232         raise errors.OpExecError("Volume group '%s' not found on %s" %
5233                                  (my_vg, node))
5234     for idx, dev in enumerate(instance.disks):
5235       if idx not in self.op.disks:
5236         continue
5237       for node in tgt_node, oth_node:
5238         info("checking disk/%d on %s" % (idx, node))
5239         cfg.SetDiskID(dev, node)
5240         result = self.rpc.call_blockdev_find(node, dev)
5241         msg = result.RemoteFailMsg()
5242         if not msg and not result.payload:
5243           msg = "disk not found"
5244         if msg:
5245           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5246                                    (idx, node, msg))
5247
5248     # Step: check other node consistency
5249     self.proc.LogStep(2, steps_total, "check peer consistency")
5250     for idx, dev in enumerate(instance.disks):
5251       if idx not in self.op.disks:
5252         continue
5253       info("checking disk/%d consistency on %s" % (idx, oth_node))
5254       if not _CheckDiskConsistency(self, dev, oth_node,
5255                                    oth_node==instance.primary_node):
5256         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5257                                  " to replace disks on this node (%s)" %
5258                                  (oth_node, tgt_node))
5259
5260     # Step: create new storage
5261     self.proc.LogStep(3, steps_total, "allocate new storage")
5262     for idx, dev in enumerate(instance.disks):
5263       if idx not in self.op.disks:
5264         continue
5265       size = dev.size
5266       cfg.SetDiskID(dev, tgt_node)
5267       lv_names = [".disk%d_%s" % (idx, suf)
5268                   for suf in ["data", "meta"]]
5269       names = _GenerateUniqueNames(self, lv_names)
5270       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5271                              logical_id=(vgname, names[0]))
5272       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5273                              logical_id=(vgname, names[1]))
5274       new_lvs = [lv_data, lv_meta]
5275       old_lvs = dev.children
5276       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5277       info("creating new local storage on %s for %s" %
5278            (tgt_node, dev.iv_name))
5279       # we pass force_create=True to force the LVM creation
5280       for new_lv in new_lvs:
5281         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5282                         _GetInstanceInfoText(instance), False)
5283
5284     # Step: for each lv, detach+rename*2+attach
5285     self.proc.LogStep(4, steps_total, "change drbd configuration")
5286     for dev, old_lvs, new_lvs in iv_names.itervalues():
5287       info("detaching %s drbd from local storage" % dev.iv_name)
5288       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5289       msg = result.RemoteFailMsg()
5290       if msg:
5291         raise errors.OpExecError("Can't detach drbd from local storage on node"
5292                                  " %s for device %s: %s" %
5293                                  (tgt_node, dev.iv_name, msg))
5294       #dev.children = []
5295       #cfg.Update(instance)
5296
5297       # ok, we created the new LVs, so now we know we have the needed
5298       # storage; as such, we proceed on the target node to rename
5299       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5300       # using the assumption that logical_id == physical_id (which in
5301       # turn is the unique_id on that node)
5302
5303       # FIXME(iustin): use a better name for the replaced LVs
5304       temp_suffix = int(time.time())
5305       ren_fn = lambda d, suff: (d.physical_id[0],
5306                                 d.physical_id[1] + "_replaced-%s" % suff)
5307       # build the rename list based on what LVs exist on the node
5308       rlist = []
5309       for to_ren in old_lvs:
5310         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5311         if not result.RemoteFailMsg() and result.payload:
5312           # device exists
5313           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5314
5315       info("renaming the old LVs on the target node")
5316       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5317       msg = result.RemoteFailMsg()
5318       if msg:
5319         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5320                                  (tgt_node, msg))
5321       # now we rename the new LVs to the old LVs
5322       info("renaming the new LVs on the target node")
5323       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5324       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5325       msg = result.RemoteFailMsg()
5326       if msg:
5327         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5328                                  (tgt_node, msg))
5329
5330       for old, new in zip(old_lvs, new_lvs):
5331         new.logical_id = old.logical_id
5332         cfg.SetDiskID(new, tgt_node)
5333
5334       for disk in old_lvs:
5335         disk.logical_id = ren_fn(disk, temp_suffix)
5336         cfg.SetDiskID(disk, tgt_node)
5337
5338       # now that the new lvs have the old name, we can add them to the device
5339       info("adding new mirror component on %s" % tgt_node)
5340       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5341       msg = result.RemoteFailMsg()
5342       if msg:
5343         for new_lv in new_lvs:
5344           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5345           if msg:
5346             warning("Can't rollback device %s: %s", dev, msg,
5347                     hint="cleanup manually the unused logical volumes")
5348         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5349
5350       dev.children = new_lvs
5351       cfg.Update(instance)
5352
5353     # Step: wait for sync
5354
5355     # this can fail as the old devices are degraded and _WaitForSync
5356     # does a combined result over all disks, so we don't check its
5357     # return value
5358     self.proc.LogStep(5, steps_total, "sync devices")
5359     _WaitForSync(self, instance, unlock=True)
5360
5361     # so check manually all the devices
5362     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5363       cfg.SetDiskID(dev, instance.primary_node)
5364       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5365       msg = result.RemoteFailMsg()
5366       if not msg and not result.payload:
5367         msg = "disk not found"
5368       if msg:
5369         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5370                                  (name, msg))
5371       if result.payload[5]:
5372         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5373
5374     # Step: remove old storage
5375     self.proc.LogStep(6, steps_total, "removing old storage")
5376     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5377       info("remove logical volumes for %s" % name)
5378       for lv in old_lvs:
5379         cfg.SetDiskID(lv, tgt_node)
5380         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5381         if msg:
5382           warning("Can't remove old LV: %s" % msg,
5383                   hint="manually remove unused LVs")
5384           continue
5385
5386   def _ExecD8Secondary(self, feedback_fn):
5387     """Replace the secondary node for drbd8.
5388
5389     The algorithm for replace is quite complicated:
5390       - for all disks of the instance:
5391         - create new LVs on the new node with same names
5392         - shutdown the drbd device on the old secondary
5393         - disconnect the drbd network on the primary
5394         - create the drbd device on the new secondary
5395         - network attach the drbd on the primary, using an artifice:
5396           the drbd code for Attach() will connect to the network if it
5397           finds a device which is connected to the good local disks but
5398           not network enabled
5399       - wait for sync across all devices
5400       - remove all disks from the old secondary
5401
5402     Failures are not very well handled.
5403
5404     """
5405     steps_total = 6
5406     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5407     instance = self.instance
5408     iv_names = {}
5409     # start of work
5410     cfg = self.cfg
5411     old_node = self.tgt_node
5412     new_node = self.new_node
5413     pri_node = instance.primary_node
5414     nodes_ip = {
5415       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5416       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5417       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5418       }
5419
5420     # Step: check device activation
5421     self.proc.LogStep(1, steps_total, "check device existence")
5422     info("checking volume groups")
5423     my_vg = cfg.GetVGName()
5424     results = self.rpc.call_vg_list([pri_node, new_node])
5425     for node in pri_node, new_node:
5426       res = results[node]
5427       if res.failed or not res.data or my_vg not in res.data:
5428         raise errors.OpExecError("Volume group '%s' not found on %s" %
5429                                  (my_vg, node))
5430     for idx, dev in enumerate(instance.disks):
5431       if idx not in self.op.disks:
5432         continue
5433       info("checking disk/%d on %s" % (idx, pri_node))
5434       cfg.SetDiskID(dev, pri_node)
5435       result = self.rpc.call_blockdev_find(pri_node, dev)
5436       msg = result.RemoteFailMsg()
5437       if not msg and not result.payload:
5438         msg = "disk not found"
5439       if msg:
5440         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5441                                  (idx, pri_node, msg))
5442
5443     # Step: check other node consistency
5444     self.proc.LogStep(2, steps_total, "check peer consistency")
5445     for idx, dev in enumerate(instance.disks):
5446       if idx not in self.op.disks:
5447         continue
5448       info("checking disk/%d consistency on %s" % (idx, pri_node))
5449       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5450         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5451                                  " unsafe to replace the secondary" %
5452                                  pri_node)
5453
5454     # Step: create new storage
5455     self.proc.LogStep(3, steps_total, "allocate new storage")
5456     for idx, dev in enumerate(instance.disks):
5457       info("adding new local storage on %s for disk/%d" %
5458            (new_node, idx))
5459       # we pass force_create=True to force LVM creation
5460       for new_lv in dev.children:
5461         _CreateBlockDev(self, new_node, instance, new_lv, True,
5462                         _GetInstanceInfoText(instance), False)
5463
5464     # Step 4: dbrd minors and drbd setups changes
5465     # after this, we must manually remove the drbd minors on both the
5466     # error and the success paths
5467     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5468                                    instance.name)
5469     logging.debug("Allocated minors %s" % (minors,))
5470     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5471     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5472       size = dev.size
5473       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5474       # create new devices on new_node; note that we create two IDs:
5475       # one without port, so the drbd will be activated without
5476       # networking information on the new node at this stage, and one
5477       # with network, for the latter activation in step 4
5478       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5479       if pri_node == o_node1:
5480         p_minor = o_minor1
5481       else:
5482         p_minor = o_minor2
5483
5484       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5485       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5486
5487       iv_names[idx] = (dev, dev.children, new_net_id)
5488       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5489                     new_net_id)
5490       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5491                               logical_id=new_alone_id,
5492                               children=dev.children)
5493       try:
5494         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5495                               _GetInstanceInfoText(instance), False)
5496       except errors.GenericError:
5497         self.cfg.ReleaseDRBDMinors(instance.name)
5498         raise
5499
5500     for idx, dev in enumerate(instance.disks):
5501       # we have new devices, shutdown the drbd on the old secondary
5502       info("shutting down drbd for disk/%d on old node" % idx)
5503       cfg.SetDiskID(dev, old_node)
5504       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5505       if msg:
5506         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5507                 (idx, msg),
5508                 hint="Please cleanup this device manually as soon as possible")
5509
5510     info("detaching primary drbds from the network (=> standalone)")
5511     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5512                                                instance.disks)[pri_node]
5513
5514     msg = result.RemoteFailMsg()
5515     if msg:
5516       # detaches didn't succeed (unlikely)
5517       self.cfg.ReleaseDRBDMinors(instance.name)
5518       raise errors.OpExecError("Can't detach the disks from the network on"
5519                                " old node: %s" % (msg,))
5520
5521     # if we managed to detach at least one, we update all the disks of
5522     # the instance to point to the new secondary
5523     info("updating instance configuration")
5524     for dev, _, new_logical_id in iv_names.itervalues():
5525       dev.logical_id = new_logical_id
5526       cfg.SetDiskID(dev, pri_node)
5527     cfg.Update(instance)
5528
5529     # and now perform the drbd attach
5530     info("attaching primary drbds to new secondary (standalone => connected)")
5531     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5532                                            instance.disks, instance.name,
5533                                            False)
5534     for to_node, to_result in result.items():
5535       msg = to_result.RemoteFailMsg()
5536       if msg:
5537         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5538                 hint="please do a gnt-instance info to see the"
5539                 " status of disks")
5540
5541     # this can fail as the old devices are degraded and _WaitForSync
5542     # does a combined result over all disks, so we don't check its
5543     # return value
5544     self.proc.LogStep(5, steps_total, "sync devices")
5545     _WaitForSync(self, instance, unlock=True)
5546
5547     # so check manually all the devices
5548     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5549       cfg.SetDiskID(dev, pri_node)
5550       result = self.rpc.call_blockdev_find(pri_node, dev)
5551       msg = result.RemoteFailMsg()
5552       if not msg and not result.payload:
5553         msg = "disk not found"
5554       if msg:
5555         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5556                                  (idx, msg))
5557       if result.payload[5]:
5558         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5559
5560     self.proc.LogStep(6, steps_total, "removing old storage")
5561     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5562       info("remove logical volumes for disk/%d" % idx)
5563       for lv in old_lvs:
5564         cfg.SetDiskID(lv, old_node)
5565         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5566         if msg:
5567           warning("Can't remove LV on old secondary: %s", msg,
5568                   hint="Cleanup stale volumes by hand")
5569
5570   def Exec(self, feedback_fn):
5571     """Execute disk replacement.
5572
5573     This dispatches the disk replacement to the appropriate handler.
5574
5575     """
5576     instance = self.instance
5577
5578     # Activate the instance disks if we're replacing them on a down instance
5579     if not instance.admin_up:
5580       _StartInstanceDisks(self, instance, True)
5581
5582     if self.op.mode == constants.REPLACE_DISK_CHG:
5583       fn = self._ExecD8Secondary
5584     else:
5585       fn = self._ExecD8DiskOnly
5586
5587     ret = fn(feedback_fn)
5588
5589     # Deactivate the instance disks if we're replacing them on a down instance
5590     if not instance.admin_up:
5591       _SafeShutdownInstanceDisks(self, instance)
5592
5593     return ret
5594
5595
5596 class LUGrowDisk(LogicalUnit):
5597   """Grow a disk of an instance.
5598
5599   """
5600   HPATH = "disk-grow"
5601   HTYPE = constants.HTYPE_INSTANCE
5602   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5603   REQ_BGL = False
5604
5605   def ExpandNames(self):
5606     self._ExpandAndLockInstance()
5607     self.needed_locks[locking.LEVEL_NODE] = []
5608     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5609
5610   def DeclareLocks(self, level):
5611     if level == locking.LEVEL_NODE:
5612       self._LockInstancesNodes()
5613
5614   def BuildHooksEnv(self):
5615     """Build hooks env.
5616
5617     This runs on the master, the primary and all the secondaries.
5618
5619     """
5620     env = {
5621       "DISK": self.op.disk,
5622       "AMOUNT": self.op.amount,
5623       }
5624     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5625     nl = [
5626       self.cfg.GetMasterNode(),
5627       self.instance.primary_node,
5628       ]
5629     return env, nl, nl
5630
5631   def CheckPrereq(self):
5632     """Check prerequisites.
5633
5634     This checks that the instance is in the cluster.
5635
5636     """
5637     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5638     assert instance is not None, \
5639       "Cannot retrieve locked instance %s" % self.op.instance_name
5640     nodenames = list(instance.all_nodes)
5641     for node in nodenames:
5642       _CheckNodeOnline(self, node)
5643
5644
5645     self.instance = instance
5646
5647     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5648       raise errors.OpPrereqError("Instance's disk layout does not support"
5649                                  " growing.")
5650
5651     self.disk = instance.FindDisk(self.op.disk)
5652
5653     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5654                                        instance.hypervisor)
5655     for node in nodenames:
5656       info = nodeinfo[node]
5657       if info.failed or not info.data:
5658         raise errors.OpPrereqError("Cannot get current information"
5659                                    " from node '%s'" % node)
5660       vg_free = info.data.get('vg_free', None)
5661       if not isinstance(vg_free, int):
5662         raise errors.OpPrereqError("Can't compute free disk space on"
5663                                    " node %s" % node)
5664       if self.op.amount > vg_free:
5665         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5666                                    " %d MiB available, %d MiB required" %
5667                                    (node, vg_free, self.op.amount))
5668
5669   def Exec(self, feedback_fn):
5670     """Execute disk grow.
5671
5672     """
5673     instance = self.instance
5674     disk = self.disk
5675     for node in instance.all_nodes:
5676       self.cfg.SetDiskID(disk, node)
5677       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5678       msg = result.RemoteFailMsg()
5679       if msg:
5680         raise errors.OpExecError("Grow request failed to node %s: %s" %
5681                                  (node, msg))
5682     disk.RecordGrow(self.op.amount)
5683     self.cfg.Update(instance)
5684     if self.op.wait_for_sync:
5685       disk_abort = not _WaitForSync(self, instance)
5686       if disk_abort:
5687         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5688                              " status.\nPlease check the instance.")
5689
5690
5691 class LUQueryInstanceData(NoHooksLU):
5692   """Query runtime instance data.
5693
5694   """
5695   _OP_REQP = ["instances", "static"]
5696   REQ_BGL = False
5697
5698   def ExpandNames(self):
5699     self.needed_locks = {}
5700     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5701
5702     if not isinstance(self.op.instances, list):
5703       raise errors.OpPrereqError("Invalid argument type 'instances'")
5704
5705     if self.op.instances:
5706       self.wanted_names = []
5707       for name in self.op.instances:
5708         full_name = self.cfg.ExpandInstanceName(name)
5709         if full_name is None:
5710           raise errors.OpPrereqError("Instance '%s' not known" % name)
5711         self.wanted_names.append(full_name)
5712       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5713     else:
5714       self.wanted_names = None
5715       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5716
5717     self.needed_locks[locking.LEVEL_NODE] = []
5718     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5719
5720   def DeclareLocks(self, level):
5721     if level == locking.LEVEL_NODE:
5722       self._LockInstancesNodes()
5723
5724   def CheckPrereq(self):
5725     """Check prerequisites.
5726
5727     This only checks the optional instance list against the existing names.
5728
5729     """
5730     if self.wanted_names is None:
5731       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5732
5733     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5734                              in self.wanted_names]
5735     return
5736
5737   def _ComputeDiskStatus(self, instance, snode, dev):
5738     """Compute block device status.
5739
5740     """
5741     static = self.op.static
5742     if not static:
5743       self.cfg.SetDiskID(dev, instance.primary_node)
5744       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5745       if dev_pstatus.offline:
5746         dev_pstatus = None
5747       else:
5748         msg = dev_pstatus.RemoteFailMsg()
5749         if msg:
5750           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5751                                    (instance.name, msg))
5752         dev_pstatus = dev_pstatus.payload
5753     else:
5754       dev_pstatus = None
5755
5756     if dev.dev_type in constants.LDS_DRBD:
5757       # we change the snode then (otherwise we use the one passed in)
5758       if dev.logical_id[0] == instance.primary_node:
5759         snode = dev.logical_id[1]
5760       else:
5761         snode = dev.logical_id[0]
5762
5763     if snode and not static:
5764       self.cfg.SetDiskID(dev, snode)
5765       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5766       if dev_sstatus.offline:
5767         dev_sstatus = None
5768       else:
5769         msg = dev_sstatus.RemoteFailMsg()
5770         if msg:
5771           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5772                                    (instance.name, msg))
5773         dev_sstatus = dev_sstatus.payload
5774     else:
5775       dev_sstatus = None
5776
5777     if dev.children:
5778       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5779                       for child in dev.children]
5780     else:
5781       dev_children = []
5782
5783     data = {
5784       "iv_name": dev.iv_name,
5785       "dev_type": dev.dev_type,
5786       "logical_id": dev.logical_id,
5787       "physical_id": dev.physical_id,
5788       "pstatus": dev_pstatus,
5789       "sstatus": dev_sstatus,
5790       "children": dev_children,
5791       "mode": dev.mode,
5792       }
5793
5794     return data
5795
5796   def Exec(self, feedback_fn):
5797     """Gather and return data"""
5798     result = {}
5799
5800     cluster = self.cfg.GetClusterInfo()
5801
5802     for instance in self.wanted_instances:
5803       if not self.op.static:
5804         remote_info = self.rpc.call_instance_info(instance.primary_node,
5805                                                   instance.name,
5806                                                   instance.hypervisor)
5807         remote_info.Raise()
5808         remote_info = remote_info.data
5809         if remote_info and "state" in remote_info:
5810           remote_state = "up"
5811         else:
5812           remote_state = "down"
5813       else:
5814         remote_state = None
5815       if instance.admin_up:
5816         config_state = "up"
5817       else:
5818         config_state = "down"
5819
5820       disks = [self._ComputeDiskStatus(instance, None, device)
5821                for device in instance.disks]
5822
5823       idict = {
5824         "name": instance.name,
5825         "config_state": config_state,
5826         "run_state": remote_state,
5827         "pnode": instance.primary_node,
5828         "snodes": instance.secondary_nodes,
5829         "os": instance.os,
5830         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5831         "disks": disks,
5832         "hypervisor": instance.hypervisor,
5833         "network_port": instance.network_port,
5834         "hv_instance": instance.hvparams,
5835         "hv_actual": cluster.FillHV(instance),
5836         "be_instance": instance.beparams,
5837         "be_actual": cluster.FillBE(instance),
5838         }
5839
5840       result[instance.name] = idict
5841
5842     return result
5843
5844
5845 class LUSetInstanceParams(LogicalUnit):
5846   """Modifies an instances's parameters.
5847
5848   """
5849   HPATH = "instance-modify"
5850   HTYPE = constants.HTYPE_INSTANCE
5851   _OP_REQP = ["instance_name"]
5852   REQ_BGL = False
5853
5854   def CheckArguments(self):
5855     if not hasattr(self.op, 'nics'):
5856       self.op.nics = []
5857     if not hasattr(self.op, 'disks'):
5858       self.op.disks = []
5859     if not hasattr(self.op, 'beparams'):
5860       self.op.beparams = {}
5861     if not hasattr(self.op, 'hvparams'):
5862       self.op.hvparams = {}
5863     self.op.force = getattr(self.op, "force", False)
5864     if not (self.op.nics or self.op.disks or
5865             self.op.hvparams or self.op.beparams):
5866       raise errors.OpPrereqError("No changes submitted")
5867
5868     # Disk validation
5869     disk_addremove = 0
5870     for disk_op, disk_dict in self.op.disks:
5871       if disk_op == constants.DDM_REMOVE:
5872         disk_addremove += 1
5873         continue
5874       elif disk_op == constants.DDM_ADD:
5875         disk_addremove += 1
5876       else:
5877         if not isinstance(disk_op, int):
5878           raise errors.OpPrereqError("Invalid disk index")
5879       if disk_op == constants.DDM_ADD:
5880         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5881         if mode not in constants.DISK_ACCESS_SET:
5882           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5883         size = disk_dict.get('size', None)
5884         if size is None:
5885           raise errors.OpPrereqError("Required disk parameter size missing")
5886         try:
5887           size = int(size)
5888         except ValueError, err:
5889           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5890                                      str(err))
5891         disk_dict['size'] = size
5892       else:
5893         # modification of disk
5894         if 'size' in disk_dict:
5895           raise errors.OpPrereqError("Disk size change not possible, use"
5896                                      " grow-disk")
5897
5898     if disk_addremove > 1:
5899       raise errors.OpPrereqError("Only one disk add or remove operation"
5900                                  " supported at a time")
5901
5902     # NIC validation
5903     nic_addremove = 0
5904     for nic_op, nic_dict in self.op.nics:
5905       if nic_op == constants.DDM_REMOVE:
5906         nic_addremove += 1
5907         continue
5908       elif nic_op == constants.DDM_ADD:
5909         nic_addremove += 1
5910       else:
5911         if not isinstance(nic_op, int):
5912           raise errors.OpPrereqError("Invalid nic index")
5913
5914       # nic_dict should be a dict
5915       nic_ip = nic_dict.get('ip', None)
5916       if nic_ip is not None:
5917         if nic_ip.lower() == constants.VALUE_NONE:
5918           nic_dict['ip'] = None
5919         else:
5920           if not utils.IsValidIP(nic_ip):
5921             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5922
5923       nic_bridge = nic_dict.get('bridge', None)
5924       nic_link = nic_dict.get('link', None)
5925       if nic_bridge and nic_link:
5926         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5927       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5928         nic_dict['bridge'] = None
5929       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5930         nic_dict['link'] = None
5931
5932       if nic_op == constants.DDM_ADD:
5933         nic_mac = nic_dict.get('mac', None)
5934         if nic_mac is None:
5935           nic_dict['mac'] = constants.VALUE_AUTO
5936
5937       if 'mac' in nic_dict:
5938         nic_mac = nic_dict['mac']
5939         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5940           if not utils.IsValidMac(nic_mac):
5941             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5942         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5943           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5944                                      " modifying an existing nic")
5945
5946     if nic_addremove > 1:
5947       raise errors.OpPrereqError("Only one NIC add or remove operation"
5948                                  " supported at a time")
5949
5950   def ExpandNames(self):
5951     self._ExpandAndLockInstance()
5952     self.needed_locks[locking.LEVEL_NODE] = []
5953     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5954
5955   def DeclareLocks(self, level):
5956     if level == locking.LEVEL_NODE:
5957       self._LockInstancesNodes()
5958
5959   def BuildHooksEnv(self):
5960     """Build hooks env.
5961
5962     This runs on the master, primary and secondaries.
5963
5964     """
5965     args = dict()
5966     if constants.BE_MEMORY in self.be_new:
5967       args['memory'] = self.be_new[constants.BE_MEMORY]
5968     if constants.BE_VCPUS in self.be_new:
5969       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5970     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
5971     # information at all.
5972     if self.op.nics:
5973       args['nics'] = []
5974       nic_override = dict(self.op.nics)
5975       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
5976       for idx, nic in enumerate(self.instance.nics):
5977         if idx in nic_override:
5978           this_nic_override = nic_override[idx]
5979         else:
5980           this_nic_override = {}
5981         if 'ip' in this_nic_override:
5982           ip = this_nic_override['ip']
5983         else:
5984           ip = nic.ip
5985         if 'mac' in this_nic_override:
5986           mac = this_nic_override['mac']
5987         else:
5988           mac = nic.mac
5989         if idx in self.nic_pnew:
5990           nicparams = self.nic_pnew[idx]
5991         else:
5992           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
5993         mode = nicparams[constants.NIC_MODE]
5994         link = nicparams[constants.NIC_LINK]
5995         args['nics'].append((ip, mac, mode, link))
5996       if constants.DDM_ADD in nic_override:
5997         ip = nic_override[constants.DDM_ADD].get('ip', None)
5998         mac = nic_override[constants.DDM_ADD]['mac']
5999         nicparams = self.nic_pnew[constants.DDM_ADD]
6000         mode = nicparams[constants.NIC_MODE]
6001         link = nicparams[constants.NIC_LINK]
6002         args['nics'].append((ip, mac, mode, link))
6003       elif constants.DDM_REMOVE in nic_override:
6004         del args['nics'][-1]
6005
6006     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6007     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6008     return env, nl, nl
6009
6010   def _GetUpdatedParams(self, old_params, update_dict,
6011                         default_values, parameter_types):
6012     """Return the new params dict for the given params.
6013
6014     @type old_params: dict
6015     @type old_params: old parameters
6016     @type update_dict: dict
6017     @type update_dict: dict containing new parameter values,
6018                        or constants.VALUE_DEFAULT to reset the
6019                        parameter to its default value
6020     @type default_values: dict
6021     @param default_values: default values for the filled parameters
6022     @type parameter_types: dict
6023     @param parameter_types: dict mapping target dict keys to types
6024                             in constants.ENFORCEABLE_TYPES
6025     @rtype: (dict, dict)
6026     @return: (new_parameters, filled_parameters)
6027
6028     """
6029     params_copy = copy.deepcopy(old_params)
6030     for key, val in update_dict.iteritems():
6031       if val == constants.VALUE_DEFAULT:
6032         try:
6033           del params_copy[key]
6034         except KeyError:
6035           pass
6036       else:
6037         params_copy[key] = val
6038     utils.ForceDictType(params_copy, parameter_types)
6039     params_filled = objects.FillDict(default_values, params_copy)
6040     return (params_copy, params_filled)
6041
6042   def CheckPrereq(self):
6043     """Check prerequisites.
6044
6045     This only checks the instance list against the existing names.
6046
6047     """
6048     force = self.force = self.op.force
6049
6050     # checking the new params on the primary/secondary nodes
6051
6052     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6053     cluster = self.cluster = self.cfg.GetClusterInfo()
6054     assert self.instance is not None, \
6055       "Cannot retrieve locked instance %s" % self.op.instance_name
6056     pnode = instance.primary_node
6057     nodelist = list(instance.all_nodes)
6058
6059     # hvparams processing
6060     if self.op.hvparams:
6061       i_hvdict, hv_new = self._GetUpdatedParams(
6062                              instance.hvparams, self.op.hvparams,
6063                              cluster.hvparams[instance.hypervisor],
6064                              constants.HVS_PARAMETER_TYPES)
6065       # local check
6066       hypervisor.GetHypervisor(
6067         instance.hypervisor).CheckParameterSyntax(hv_new)
6068       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6069       self.hv_new = hv_new # the new actual values
6070       self.hv_inst = i_hvdict # the new dict (without defaults)
6071     else:
6072       self.hv_new = self.hv_inst = {}
6073
6074     # beparams processing
6075     if self.op.beparams:
6076       i_bedict, be_new = self._GetUpdatedParams(
6077                              instance.beparams, self.op.beparams,
6078                              cluster.beparams[constants.PP_DEFAULT],
6079                              constants.BES_PARAMETER_TYPES)
6080       self.be_new = be_new # the new actual values
6081       self.be_inst = i_bedict # the new dict (without defaults)
6082     else:
6083       self.be_new = self.be_inst = {}
6084
6085     self.warn = []
6086
6087     if constants.BE_MEMORY in self.op.beparams and not self.force:
6088       mem_check_list = [pnode]
6089       if be_new[constants.BE_AUTO_BALANCE]:
6090         # either we changed auto_balance to yes or it was from before
6091         mem_check_list.extend(instance.secondary_nodes)
6092       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6093                                                   instance.hypervisor)
6094       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6095                                          instance.hypervisor)
6096       if nodeinfo[pnode].failed or not isinstance(nodeinfo[pnode].data, dict):
6097         # Assume the primary node is unreachable and go ahead
6098         self.warn.append("Can't get info from primary node %s" % pnode)
6099       else:
6100         if not instance_info.failed and instance_info.data:
6101           current_mem = int(instance_info.data['memory'])
6102         else:
6103           # Assume instance not running
6104           # (there is a slight race condition here, but it's not very probable,
6105           # and we have no other way to check)
6106           current_mem = 0
6107         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6108                     nodeinfo[pnode].data['memory_free'])
6109         if miss_mem > 0:
6110           raise errors.OpPrereqError("This change will prevent the instance"
6111                                      " from starting, due to %d MB of memory"
6112                                      " missing on its primary node" % miss_mem)
6113
6114       if be_new[constants.BE_AUTO_BALANCE]:
6115         for node, nres in nodeinfo.iteritems():
6116           if node not in instance.secondary_nodes:
6117             continue
6118           if nres.failed or not isinstance(nres.data, dict):
6119             self.warn.append("Can't get info from secondary node %s" % node)
6120           elif be_new[constants.BE_MEMORY] > nres.data['memory_free']:
6121             self.warn.append("Not enough memory to failover instance to"
6122                              " secondary node %s" % node)
6123
6124     # NIC processing
6125     self.nic_pnew = {}
6126     self.nic_pinst = {}
6127     for nic_op, nic_dict in self.op.nics:
6128       if nic_op == constants.DDM_REMOVE:
6129         if not instance.nics:
6130           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6131         continue
6132       if nic_op != constants.DDM_ADD:
6133         # an existing nic
6134         if nic_op < 0 or nic_op >= len(instance.nics):
6135           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6136                                      " are 0 to %d" %
6137                                      (nic_op, len(instance.nics)))
6138         old_nic_params = instance.nics[nic_op].nicparams
6139         old_nic_ip = instance.nics[nic_op].ip
6140       else:
6141         old_nic_params = {}
6142         old_nic_ip = None
6143
6144       update_params_dict = dict([(key, nic_dict[key])
6145                                  for key in constants.NICS_PARAMETERS
6146                                  if key in nic_dict])
6147
6148       if 'bridge' in nic_dict:
6149         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6150
6151       new_nic_params, new_filled_nic_params = \
6152           self._GetUpdatedParams(old_nic_params, update_params_dict,
6153                                  cluster.nicparams[constants.PP_DEFAULT],
6154                                  constants.NICS_PARAMETER_TYPES)
6155       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6156       self.nic_pinst[nic_op] = new_nic_params
6157       self.nic_pnew[nic_op] = new_filled_nic_params
6158       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6159
6160       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6161         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6162         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6163         result.Raise()
6164         if not result.data:
6165           msg = ("Bridge '%s' doesn't exist on one of"
6166                  " the instance nodes" % nic_bridge)
6167           if self.force:
6168             self.warn.append(msg)
6169           else:
6170             raise errors.OpPrereqError(msg)
6171       if new_nic_mode == constants.NIC_MODE_ROUTED:
6172         if 'ip' in nic_dict:
6173           nic_ip = nic_dict['ip']
6174         else:
6175           nic_ip = old_nic_ip
6176         if nic_ip is None:
6177           raise errors.OpPrereqError('Cannot set the nic ip to None'
6178                                      ' on a routed nic')
6179       if 'mac' in nic_dict:
6180         nic_mac = nic_dict['mac']
6181         if nic_mac is None:
6182           raise errors.OpPrereqError('Cannot set the nic mac to None')
6183         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6184           # otherwise generate the mac
6185           nic_dict['mac'] = self.cfg.GenerateMAC()
6186         else:
6187           # or validate/reserve the current one
6188           if self.cfg.IsMacInUse(nic_mac):
6189             raise errors.OpPrereqError("MAC address %s already in use"
6190                                        " in cluster" % nic_mac)
6191
6192     # DISK processing
6193     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6194       raise errors.OpPrereqError("Disk operations not supported for"
6195                                  " diskless instances")
6196     for disk_op, disk_dict in self.op.disks:
6197       if disk_op == constants.DDM_REMOVE:
6198         if len(instance.disks) == 1:
6199           raise errors.OpPrereqError("Cannot remove the last disk of"
6200                                      " an instance")
6201         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6202         ins_l = ins_l[pnode]
6203         if ins_l.failed or not isinstance(ins_l.data, list):
6204           raise errors.OpPrereqError("Can't contact node '%s'" % pnode)
6205         if instance.name in ins_l.data:
6206           raise errors.OpPrereqError("Instance is running, can't remove"
6207                                      " disks.")
6208
6209       if (disk_op == constants.DDM_ADD and
6210           len(instance.nics) >= constants.MAX_DISKS):
6211         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6212                                    " add more" % constants.MAX_DISKS)
6213       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6214         # an existing disk
6215         if disk_op < 0 or disk_op >= len(instance.disks):
6216           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6217                                      " are 0 to %d" %
6218                                      (disk_op, len(instance.disks)))
6219
6220     return
6221
6222   def Exec(self, feedback_fn):
6223     """Modifies an instance.
6224
6225     All parameters take effect only at the next restart of the instance.
6226
6227     """
6228     # Process here the warnings from CheckPrereq, as we don't have a
6229     # feedback_fn there.
6230     for warn in self.warn:
6231       feedback_fn("WARNING: %s" % warn)
6232
6233     result = []
6234     instance = self.instance
6235     cluster = self.cluster
6236     # disk changes
6237     for disk_op, disk_dict in self.op.disks:
6238       if disk_op == constants.DDM_REMOVE:
6239         # remove the last disk
6240         device = instance.disks.pop()
6241         device_idx = len(instance.disks)
6242         for node, disk in device.ComputeNodeTree(instance.primary_node):
6243           self.cfg.SetDiskID(disk, node)
6244           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6245           if msg:
6246             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6247                             " continuing anyway", device_idx, node, msg)
6248         result.append(("disk/%d" % device_idx, "remove"))
6249       elif disk_op == constants.DDM_ADD:
6250         # add a new disk
6251         if instance.disk_template == constants.DT_FILE:
6252           file_driver, file_path = instance.disks[0].logical_id
6253           file_path = os.path.dirname(file_path)
6254         else:
6255           file_driver = file_path = None
6256         disk_idx_base = len(instance.disks)
6257         new_disk = _GenerateDiskTemplate(self,
6258                                          instance.disk_template,
6259                                          instance.name, instance.primary_node,
6260                                          instance.secondary_nodes,
6261                                          [disk_dict],
6262                                          file_path,
6263                                          file_driver,
6264                                          disk_idx_base)[0]
6265         instance.disks.append(new_disk)
6266         info = _GetInstanceInfoText(instance)
6267
6268         logging.info("Creating volume %s for instance %s",
6269                      new_disk.iv_name, instance.name)
6270         # Note: this needs to be kept in sync with _CreateDisks
6271         #HARDCODE
6272         for node in instance.all_nodes:
6273           f_create = node == instance.primary_node
6274           try:
6275             _CreateBlockDev(self, node, instance, new_disk,
6276                             f_create, info, f_create)
6277           except errors.OpExecError, err:
6278             self.LogWarning("Failed to create volume %s (%s) on"
6279                             " node %s: %s",
6280                             new_disk.iv_name, new_disk, node, err)
6281         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6282                        (new_disk.size, new_disk.mode)))
6283       else:
6284         # change a given disk
6285         instance.disks[disk_op].mode = disk_dict['mode']
6286         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6287     # NIC changes
6288     for nic_op, nic_dict in self.op.nics:
6289       if nic_op == constants.DDM_REMOVE:
6290         # remove the last nic
6291         del instance.nics[-1]
6292         result.append(("nic.%d" % len(instance.nics), "remove"))
6293       elif nic_op == constants.DDM_ADD:
6294         # mac and bridge should be set, by now
6295         mac = nic_dict['mac']
6296         ip = nic_dict.get('ip', None)
6297         nicparams = self.nic_pinst[constants.DDM_ADD]
6298         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6299         instance.nics.append(new_nic)
6300         result.append(("nic.%d" % (len(instance.nics) - 1),
6301                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6302                        (new_nic.mac, new_nic.ip,
6303                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6304                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6305                        )))
6306       else:
6307         for key in 'mac', 'ip':
6308           if key in nic_dict:
6309             setattr(instance.nics[nic_op], key, nic_dict[key])
6310         if nic_op in self.nic_pnew:
6311           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6312         for key, val in nic_dict.iteritems():
6313           result.append(("nic.%s/%d" % (key, nic_op), val))
6314
6315     # hvparams changes
6316     if self.op.hvparams:
6317       instance.hvparams = self.hv_inst
6318       for key, val in self.op.hvparams.iteritems():
6319         result.append(("hv/%s" % key, val))
6320
6321     # beparams changes
6322     if self.op.beparams:
6323       instance.beparams = self.be_inst
6324       for key, val in self.op.beparams.iteritems():
6325         result.append(("be/%s" % key, val))
6326
6327     self.cfg.Update(instance)
6328
6329     return result
6330
6331
6332 class LUQueryExports(NoHooksLU):
6333   """Query the exports list
6334
6335   """
6336   _OP_REQP = ['nodes']
6337   REQ_BGL = False
6338
6339   def ExpandNames(self):
6340     self.needed_locks = {}
6341     self.share_locks[locking.LEVEL_NODE] = 1
6342     if not self.op.nodes:
6343       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6344     else:
6345       self.needed_locks[locking.LEVEL_NODE] = \
6346         _GetWantedNodes(self, self.op.nodes)
6347
6348   def CheckPrereq(self):
6349     """Check prerequisites.
6350
6351     """
6352     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6353
6354   def Exec(self, feedback_fn):
6355     """Compute the list of all the exported system images.
6356
6357     @rtype: dict
6358     @return: a dictionary with the structure node->(export-list)
6359         where export-list is a list of the instances exported on
6360         that node.
6361
6362     """
6363     rpcresult = self.rpc.call_export_list(self.nodes)
6364     result = {}
6365     for node in rpcresult:
6366       if rpcresult[node].RemoteFailMsg():
6367         result[node] = False
6368       else:
6369         result[node] = rpcresult[node].payload
6370
6371     return result
6372
6373
6374 class LUExportInstance(LogicalUnit):
6375   """Export an instance to an image in the cluster.
6376
6377   """
6378   HPATH = "instance-export"
6379   HTYPE = constants.HTYPE_INSTANCE
6380   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6381   REQ_BGL = False
6382
6383   def ExpandNames(self):
6384     self._ExpandAndLockInstance()
6385     # FIXME: lock only instance primary and destination node
6386     #
6387     # Sad but true, for now we have do lock all nodes, as we don't know where
6388     # the previous export might be, and and in this LU we search for it and
6389     # remove it from its current node. In the future we could fix this by:
6390     #  - making a tasklet to search (share-lock all), then create the new one,
6391     #    then one to remove, after
6392     #  - removing the removal operation altoghether
6393     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6394
6395   def DeclareLocks(self, level):
6396     """Last minute lock declaration."""
6397     # All nodes are locked anyway, so nothing to do here.
6398
6399   def BuildHooksEnv(self):
6400     """Build hooks env.
6401
6402     This will run on the master, primary node and target node.
6403
6404     """
6405     env = {
6406       "EXPORT_NODE": self.op.target_node,
6407       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6408       }
6409     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6410     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6411           self.op.target_node]
6412     return env, nl, nl
6413
6414   def CheckPrereq(self):
6415     """Check prerequisites.
6416
6417     This checks that the instance and node names are valid.
6418
6419     """
6420     instance_name = self.op.instance_name
6421     self.instance = self.cfg.GetInstanceInfo(instance_name)
6422     assert self.instance is not None, \
6423           "Cannot retrieve locked instance %s" % self.op.instance_name
6424     _CheckNodeOnline(self, self.instance.primary_node)
6425
6426     self.dst_node = self.cfg.GetNodeInfo(
6427       self.cfg.ExpandNodeName(self.op.target_node))
6428
6429     if self.dst_node is None:
6430       # This is wrong node name, not a non-locked node
6431       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6432     _CheckNodeOnline(self, self.dst_node.name)
6433     _CheckNodeNotDrained(self, self.dst_node.name)
6434
6435     # instance disk type verification
6436     for disk in self.instance.disks:
6437       if disk.dev_type == constants.LD_FILE:
6438         raise errors.OpPrereqError("Export not supported for instances with"
6439                                    " file-based disks")
6440
6441   def Exec(self, feedback_fn):
6442     """Export an instance to an image in the cluster.
6443
6444     """
6445     instance = self.instance
6446     dst_node = self.dst_node
6447     src_node = instance.primary_node
6448     if self.op.shutdown:
6449       # shutdown the instance, but not the disks
6450       result = self.rpc.call_instance_shutdown(src_node, instance)
6451       msg = result.RemoteFailMsg()
6452       if msg:
6453         raise errors.OpExecError("Could not shutdown instance %s on"
6454                                  " node %s: %s" %
6455                                  (instance.name, src_node, msg))
6456
6457     vgname = self.cfg.GetVGName()
6458
6459     snap_disks = []
6460
6461     # set the disks ID correctly since call_instance_start needs the
6462     # correct drbd minor to create the symlinks
6463     for disk in instance.disks:
6464       self.cfg.SetDiskID(disk, src_node)
6465
6466     try:
6467       for disk in instance.disks:
6468         # result.payload will be a snapshot of an lvm leaf of the one we passed
6469         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6470         msg = result.RemoteFailMsg()
6471         if msg:
6472           self.LogWarning("Could not snapshot block device %s on node %s: %s",
6473                           disk.logical_id[1], src_node, msg)
6474           snap_disks.append(False)
6475         else:
6476           disk_id = (vgname, result.payload)
6477           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6478                                  logical_id=disk_id, physical_id=disk_id,
6479                                  iv_name=disk.iv_name)
6480           snap_disks.append(new_dev)
6481
6482     finally:
6483       if self.op.shutdown and instance.admin_up:
6484         result = self.rpc.call_instance_start(src_node, instance, None, None)
6485         msg = result.RemoteFailMsg()
6486         if msg:
6487           _ShutdownInstanceDisks(self, instance)
6488           raise errors.OpExecError("Could not start instance: %s" % msg)
6489
6490     # TODO: check for size
6491
6492     cluster_name = self.cfg.GetClusterName()
6493     for idx, dev in enumerate(snap_disks):
6494       if dev:
6495         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6496                                                instance, cluster_name, idx)
6497         msg = result.RemoteFailMsg()
6498         if msg:
6499           self.LogWarning("Could not export block device %s from node %s to"
6500                           " node %s: %s", dev.logical_id[1], src_node,
6501                           dst_node.name, msg)
6502         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6503         if msg:
6504           self.LogWarning("Could not remove snapshot block device %s from node"
6505                           " %s: %s", dev.logical_id[1], src_node, msg)
6506
6507     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6508     msg = result.RemoteFailMsg()
6509     if msg:
6510       self.LogWarning("Could not finalize export for instance %s"
6511                       " on node %s: %s", instance.name, dst_node.name, msg)
6512
6513     nodelist = self.cfg.GetNodeList()
6514     nodelist.remove(dst_node.name)
6515
6516     # on one-node clusters nodelist will be empty after the removal
6517     # if we proceed the backup would be removed because OpQueryExports
6518     # substitutes an empty list with the full cluster node list.
6519     if nodelist:
6520       exportlist = self.rpc.call_export_list(nodelist)
6521       for node in exportlist:
6522         if exportlist[node].RemoteFailMsg():
6523           continue
6524         if instance.name in exportlist[node].payload:
6525           if not self.rpc.call_export_remove(node, instance.name):
6526             self.LogWarning("Could not remove older export for instance %s"
6527                             " on node %s", instance.name, node)
6528
6529
6530 class LURemoveExport(NoHooksLU):
6531   """Remove exports related to the named instance.
6532
6533   """
6534   _OP_REQP = ["instance_name"]
6535   REQ_BGL = False
6536
6537   def ExpandNames(self):
6538     self.needed_locks = {}
6539     # We need all nodes to be locked in order for RemoveExport to work, but we
6540     # don't need to lock the instance itself, as nothing will happen to it (and
6541     # we can remove exports also for a removed instance)
6542     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6543
6544   def CheckPrereq(self):
6545     """Check prerequisites.
6546     """
6547     pass
6548
6549   def Exec(self, feedback_fn):
6550     """Remove any export.
6551
6552     """
6553     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6554     # If the instance was not found we'll try with the name that was passed in.
6555     # This will only work if it was an FQDN, though.
6556     fqdn_warn = False
6557     if not instance_name:
6558       fqdn_warn = True
6559       instance_name = self.op.instance_name
6560
6561     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6562     exportlist = self.rpc.call_export_list(locked_nodes)
6563     found = False
6564     for node in exportlist:
6565       msg = exportlist[node].RemoteFailMsg()
6566       if msg:
6567         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6568         continue
6569       if instance_name in exportlist[node].payload:
6570         found = True
6571         result = self.rpc.call_export_remove(node, instance_name)
6572         if result.failed or not result.data:
6573           logging.error("Could not remove export for instance %s"
6574                         " on node %s", instance_name, node)
6575
6576     if fqdn_warn and not found:
6577       feedback_fn("Export not found. If trying to remove an export belonging"
6578                   " to a deleted instance please use its Fully Qualified"
6579                   " Domain Name.")
6580
6581
6582 class TagsLU(NoHooksLU):
6583   """Generic tags LU.
6584
6585   This is an abstract class which is the parent of all the other tags LUs.
6586
6587   """
6588
6589   def ExpandNames(self):
6590     self.needed_locks = {}
6591     if self.op.kind == constants.TAG_NODE:
6592       name = self.cfg.ExpandNodeName(self.op.name)
6593       if name is None:
6594         raise errors.OpPrereqError("Invalid node name (%s)" %
6595                                    (self.op.name,))
6596       self.op.name = name
6597       self.needed_locks[locking.LEVEL_NODE] = name
6598     elif self.op.kind == constants.TAG_INSTANCE:
6599       name = self.cfg.ExpandInstanceName(self.op.name)
6600       if name is None:
6601         raise errors.OpPrereqError("Invalid instance name (%s)" %
6602                                    (self.op.name,))
6603       self.op.name = name
6604       self.needed_locks[locking.LEVEL_INSTANCE] = name
6605
6606   def CheckPrereq(self):
6607     """Check prerequisites.
6608
6609     """
6610     if self.op.kind == constants.TAG_CLUSTER:
6611       self.target = self.cfg.GetClusterInfo()
6612     elif self.op.kind == constants.TAG_NODE:
6613       self.target = self.cfg.GetNodeInfo(self.op.name)
6614     elif self.op.kind == constants.TAG_INSTANCE:
6615       self.target = self.cfg.GetInstanceInfo(self.op.name)
6616     else:
6617       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6618                                  str(self.op.kind))
6619
6620
6621 class LUGetTags(TagsLU):
6622   """Returns the tags of a given object.
6623
6624   """
6625   _OP_REQP = ["kind", "name"]
6626   REQ_BGL = False
6627
6628   def Exec(self, feedback_fn):
6629     """Returns the tag list.
6630
6631     """
6632     return list(self.target.GetTags())
6633
6634
6635 class LUSearchTags(NoHooksLU):
6636   """Searches the tags for a given pattern.
6637
6638   """
6639   _OP_REQP = ["pattern"]
6640   REQ_BGL = False
6641
6642   def ExpandNames(self):
6643     self.needed_locks = {}
6644
6645   def CheckPrereq(self):
6646     """Check prerequisites.
6647
6648     This checks the pattern passed for validity by compiling it.
6649
6650     """
6651     try:
6652       self.re = re.compile(self.op.pattern)
6653     except re.error, err:
6654       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6655                                  (self.op.pattern, err))
6656
6657   def Exec(self, feedback_fn):
6658     """Returns the tag list.
6659
6660     """
6661     cfg = self.cfg
6662     tgts = [("/cluster", cfg.GetClusterInfo())]
6663     ilist = cfg.GetAllInstancesInfo().values()
6664     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6665     nlist = cfg.GetAllNodesInfo().values()
6666     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6667     results = []
6668     for path, target in tgts:
6669       for tag in target.GetTags():
6670         if self.re.search(tag):
6671           results.append((path, tag))
6672     return results
6673
6674
6675 class LUAddTags(TagsLU):
6676   """Sets a tag on a given object.
6677
6678   """
6679   _OP_REQP = ["kind", "name", "tags"]
6680   REQ_BGL = False
6681
6682   def CheckPrereq(self):
6683     """Check prerequisites.
6684
6685     This checks the type and length of the tag name and value.
6686
6687     """
6688     TagsLU.CheckPrereq(self)
6689     for tag in self.op.tags:
6690       objects.TaggableObject.ValidateTag(tag)
6691
6692   def Exec(self, feedback_fn):
6693     """Sets the tag.
6694
6695     """
6696     try:
6697       for tag in self.op.tags:
6698         self.target.AddTag(tag)
6699     except errors.TagError, err:
6700       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6701     try:
6702       self.cfg.Update(self.target)
6703     except errors.ConfigurationError:
6704       raise errors.OpRetryError("There has been a modification to the"
6705                                 " config file and the operation has been"
6706                                 " aborted. Please retry.")
6707
6708
6709 class LUDelTags(TagsLU):
6710   """Delete a list of tags from a given object.
6711
6712   """
6713   _OP_REQP = ["kind", "name", "tags"]
6714   REQ_BGL = False
6715
6716   def CheckPrereq(self):
6717     """Check prerequisites.
6718
6719     This checks that we have the given tag.
6720
6721     """
6722     TagsLU.CheckPrereq(self)
6723     for tag in self.op.tags:
6724       objects.TaggableObject.ValidateTag(tag)
6725     del_tags = frozenset(self.op.tags)
6726     cur_tags = self.target.GetTags()
6727     if not del_tags <= cur_tags:
6728       diff_tags = del_tags - cur_tags
6729       diff_names = ["'%s'" % tag for tag in diff_tags]
6730       diff_names.sort()
6731       raise errors.OpPrereqError("Tag(s) %s not found" %
6732                                  (",".join(diff_names)))
6733
6734   def Exec(self, feedback_fn):
6735     """Remove the tag from the object.
6736
6737     """
6738     for tag in self.op.tags:
6739       self.target.RemoveTag(tag)
6740     try:
6741       self.cfg.Update(self.target)
6742     except errors.ConfigurationError:
6743       raise errors.OpRetryError("There has been a modification to the"
6744                                 " config file and the operation has been"
6745                                 " aborted. Please retry.")
6746
6747
6748 class LUTestDelay(NoHooksLU):
6749   """Sleep for a specified amount of time.
6750
6751   This LU sleeps on the master and/or nodes for a specified amount of
6752   time.
6753
6754   """
6755   _OP_REQP = ["duration", "on_master", "on_nodes"]
6756   REQ_BGL = False
6757
6758   def ExpandNames(self):
6759     """Expand names and set required locks.
6760
6761     This expands the node list, if any.
6762
6763     """
6764     self.needed_locks = {}
6765     if self.op.on_nodes:
6766       # _GetWantedNodes can be used here, but is not always appropriate to use
6767       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6768       # more information.
6769       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6770       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6771
6772   def CheckPrereq(self):
6773     """Check prerequisites.
6774
6775     """
6776
6777   def Exec(self, feedback_fn):
6778     """Do the actual sleep.
6779
6780     """
6781     if self.op.on_master:
6782       if not utils.TestDelay(self.op.duration):
6783         raise errors.OpExecError("Error during master delay test")
6784     if self.op.on_nodes:
6785       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6786       if not result:
6787         raise errors.OpExecError("Complete failure from rpc call")
6788       for node, node_result in result.items():
6789         node_result.Raise()
6790         if not node_result.data:
6791           raise errors.OpExecError("Failure during rpc call to node %s,"
6792                                    " result: %s" % (node, node_result.data))
6793
6794
6795 class IAllocator(object):
6796   """IAllocator framework.
6797
6798   An IAllocator instance has three sets of attributes:
6799     - cfg that is needed to query the cluster
6800     - input data (all members of the _KEYS class attribute are required)
6801     - four buffer attributes (in|out_data|text), that represent the
6802       input (to the external script) in text and data structure format,
6803       and the output from it, again in two formats
6804     - the result variables from the script (success, info, nodes) for
6805       easy usage
6806
6807   """
6808   _ALLO_KEYS = [
6809     "mem_size", "disks", "disk_template",
6810     "os", "tags", "nics", "vcpus", "hypervisor",
6811     ]
6812   _RELO_KEYS = [
6813     "relocate_from",
6814     ]
6815
6816   def __init__(self, lu, mode, name, **kwargs):
6817     self.lu = lu
6818     # init buffer variables
6819     self.in_text = self.out_text = self.in_data = self.out_data = None
6820     # init all input fields so that pylint is happy
6821     self.mode = mode
6822     self.name = name
6823     self.mem_size = self.disks = self.disk_template = None
6824     self.os = self.tags = self.nics = self.vcpus = None
6825     self.hypervisor = None
6826     self.relocate_from = None
6827     # computed fields
6828     self.required_nodes = None
6829     # init result fields
6830     self.success = self.info = self.nodes = None
6831     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6832       keyset = self._ALLO_KEYS
6833     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6834       keyset = self._RELO_KEYS
6835     else:
6836       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6837                                    " IAllocator" % self.mode)
6838     for key in kwargs:
6839       if key not in keyset:
6840         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6841                                      " IAllocator" % key)
6842       setattr(self, key, kwargs[key])
6843     for key in keyset:
6844       if key not in kwargs:
6845         raise errors.ProgrammerError("Missing input parameter '%s' to"
6846                                      " IAllocator" % key)
6847     self._BuildInputData()
6848
6849   def _ComputeClusterData(self):
6850     """Compute the generic allocator input data.
6851
6852     This is the data that is independent of the actual operation.
6853
6854     """
6855     cfg = self.lu.cfg
6856     cluster_info = cfg.GetClusterInfo()
6857     # cluster data
6858     data = {
6859       "version": constants.IALLOCATOR_VERSION,
6860       "cluster_name": cfg.GetClusterName(),
6861       "cluster_tags": list(cluster_info.GetTags()),
6862       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6863       # we don't have job IDs
6864       }
6865     iinfo = cfg.GetAllInstancesInfo().values()
6866     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6867
6868     # node data
6869     node_results = {}
6870     node_list = cfg.GetNodeList()
6871
6872     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6873       hypervisor_name = self.hypervisor
6874     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6875       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6876
6877     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6878                                            hypervisor_name)
6879     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6880                        cluster_info.enabled_hypervisors)
6881     for nname, nresult in node_data.items():
6882       # first fill in static (config-based) values
6883       ninfo = cfg.GetNodeInfo(nname)
6884       pnr = {
6885         "tags": list(ninfo.GetTags()),
6886         "primary_ip": ninfo.primary_ip,
6887         "secondary_ip": ninfo.secondary_ip,
6888         "offline": ninfo.offline,
6889         "drained": ninfo.drained,
6890         "master_candidate": ninfo.master_candidate,
6891         }
6892
6893       if not ninfo.offline:
6894         nresult.Raise()
6895         if not isinstance(nresult.data, dict):
6896           raise errors.OpExecError("Can't get data for node %s" % nname)
6897         remote_info = nresult.data
6898         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6899                      'vg_size', 'vg_free', 'cpu_total']:
6900           if attr not in remote_info:
6901             raise errors.OpExecError("Node '%s' didn't return attribute"
6902                                      " '%s'" % (nname, attr))
6903           try:
6904             remote_info[attr] = int(remote_info[attr])
6905           except ValueError, err:
6906             raise errors.OpExecError("Node '%s' returned invalid value"
6907                                      " for '%s': %s" % (nname, attr, err))
6908         # compute memory used by primary instances
6909         i_p_mem = i_p_up_mem = 0
6910         for iinfo, beinfo in i_list:
6911           if iinfo.primary_node == nname:
6912             i_p_mem += beinfo[constants.BE_MEMORY]
6913             if iinfo.name not in node_iinfo[nname].data:
6914               i_used_mem = 0
6915             else:
6916               i_used_mem = int(node_iinfo[nname].data[iinfo.name]['memory'])
6917             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6918             remote_info['memory_free'] -= max(0, i_mem_diff)
6919
6920             if iinfo.admin_up:
6921               i_p_up_mem += beinfo[constants.BE_MEMORY]
6922
6923         # compute memory used by instances
6924         pnr_dyn = {
6925           "total_memory": remote_info['memory_total'],
6926           "reserved_memory": remote_info['memory_dom0'],
6927           "free_memory": remote_info['memory_free'],
6928           "total_disk": remote_info['vg_size'],
6929           "free_disk": remote_info['vg_free'],
6930           "total_cpus": remote_info['cpu_total'],
6931           "i_pri_memory": i_p_mem,
6932           "i_pri_up_memory": i_p_up_mem,
6933           }
6934         pnr.update(pnr_dyn)
6935
6936       node_results[nname] = pnr
6937     data["nodes"] = node_results
6938
6939     # instance data
6940     instance_data = {}
6941     for iinfo, beinfo in i_list:
6942       nic_data = []
6943       for nic in iinfo.nics:
6944         filled_params = objects.FillDict(
6945             cluster_info.nicparams[constants.PP_DEFAULT],
6946             nic.nicparams)
6947         nic_dict = {"mac": nic.mac,
6948                     "ip": nic.ip,
6949                     "mode": filled_params[constants.NIC_MODE],
6950                     "link": filled_params[constants.NIC_LINK],
6951                    }
6952         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
6953           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
6954         nic_data.append(nic_dict)
6955       pir = {
6956         "tags": list(iinfo.GetTags()),
6957         "admin_up": iinfo.admin_up,
6958         "vcpus": beinfo[constants.BE_VCPUS],
6959         "memory": beinfo[constants.BE_MEMORY],
6960         "os": iinfo.os,
6961         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
6962         "nics": nic_data,
6963         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
6964         "disk_template": iinfo.disk_template,
6965         "hypervisor": iinfo.hypervisor,
6966         }
6967       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
6968                                                  pir["disks"])
6969       instance_data[iinfo.name] = pir
6970
6971     data["instances"] = instance_data
6972
6973     self.in_data = data
6974
6975   def _AddNewInstance(self):
6976     """Add new instance data to allocator structure.
6977
6978     This in combination with _AllocatorGetClusterData will create the
6979     correct structure needed as input for the allocator.
6980
6981     The checks for the completeness of the opcode must have already been
6982     done.
6983
6984     """
6985     data = self.in_data
6986
6987     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
6988
6989     if self.disk_template in constants.DTS_NET_MIRROR:
6990       self.required_nodes = 2
6991     else:
6992       self.required_nodes = 1
6993     request = {
6994       "type": "allocate",
6995       "name": self.name,
6996       "disk_template": self.disk_template,
6997       "tags": self.tags,
6998       "os": self.os,
6999       "vcpus": self.vcpus,
7000       "memory": self.mem_size,
7001       "disks": self.disks,
7002       "disk_space_total": disk_space,
7003       "nics": self.nics,
7004       "required_nodes": self.required_nodes,
7005       }
7006     data["request"] = request
7007
7008   def _AddRelocateInstance(self):
7009     """Add relocate instance data to allocator structure.
7010
7011     This in combination with _IAllocatorGetClusterData will create the
7012     correct structure needed as input for the allocator.
7013
7014     The checks for the completeness of the opcode must have already been
7015     done.
7016
7017     """
7018     instance = self.lu.cfg.GetInstanceInfo(self.name)
7019     if instance is None:
7020       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7021                                    " IAllocator" % self.name)
7022
7023     if instance.disk_template not in constants.DTS_NET_MIRROR:
7024       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7025
7026     if len(instance.secondary_nodes) != 1:
7027       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7028
7029     self.required_nodes = 1
7030     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7031     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7032
7033     request = {
7034       "type": "relocate",
7035       "name": self.name,
7036       "disk_space_total": disk_space,
7037       "required_nodes": self.required_nodes,
7038       "relocate_from": self.relocate_from,
7039       }
7040     self.in_data["request"] = request
7041
7042   def _BuildInputData(self):
7043     """Build input data structures.
7044
7045     """
7046     self._ComputeClusterData()
7047
7048     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7049       self._AddNewInstance()
7050     else:
7051       self._AddRelocateInstance()
7052
7053     self.in_text = serializer.Dump(self.in_data)
7054
7055   def Run(self, name, validate=True, call_fn=None):
7056     """Run an instance allocator and return the results.
7057
7058     """
7059     if call_fn is None:
7060       call_fn = self.lu.rpc.call_iallocator_runner
7061     data = self.in_text
7062
7063     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7064     result.Raise()
7065
7066     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7067       raise errors.OpExecError("Invalid result from master iallocator runner")
7068
7069     rcode, stdout, stderr, fail = result.data
7070
7071     if rcode == constants.IARUN_NOTFOUND:
7072       raise errors.OpExecError("Can't find allocator '%s'" % name)
7073     elif rcode == constants.IARUN_FAILURE:
7074       raise errors.OpExecError("Instance allocator call failed: %s,"
7075                                " output: %s" % (fail, stdout+stderr))
7076     self.out_text = stdout
7077     if validate:
7078       self._ValidateResult()
7079
7080   def _ValidateResult(self):
7081     """Process the allocator results.
7082
7083     This will process and if successful save the result in
7084     self.out_data and the other parameters.
7085
7086     """
7087     try:
7088       rdict = serializer.Load(self.out_text)
7089     except Exception, err:
7090       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7091
7092     if not isinstance(rdict, dict):
7093       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7094
7095     for key in "success", "info", "nodes":
7096       if key not in rdict:
7097         raise errors.OpExecError("Can't parse iallocator results:"
7098                                  " missing key '%s'" % key)
7099       setattr(self, key, rdict[key])
7100
7101     if not isinstance(rdict["nodes"], list):
7102       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7103                                " is not a list")
7104     self.out_data = rdict
7105
7106
7107 class LUTestAllocator(NoHooksLU):
7108   """Run allocator tests.
7109
7110   This LU runs the allocator tests
7111
7112   """
7113   _OP_REQP = ["direction", "mode", "name"]
7114
7115   def CheckPrereq(self):
7116     """Check prerequisites.
7117
7118     This checks the opcode parameters depending on the director and mode test.
7119
7120     """
7121     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7122       for attr in ["name", "mem_size", "disks", "disk_template",
7123                    "os", "tags", "nics", "vcpus"]:
7124         if not hasattr(self.op, attr):
7125           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7126                                      attr)
7127       iname = self.cfg.ExpandInstanceName(self.op.name)
7128       if iname is not None:
7129         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7130                                    iname)
7131       if not isinstance(self.op.nics, list):
7132         raise errors.OpPrereqError("Invalid parameter 'nics'")
7133       for row in self.op.nics:
7134         if (not isinstance(row, dict) or
7135             "mac" not in row or
7136             "ip" not in row or
7137             "bridge" not in row):
7138           raise errors.OpPrereqError("Invalid contents of the"
7139                                      " 'nics' parameter")
7140       if not isinstance(self.op.disks, list):
7141         raise errors.OpPrereqError("Invalid parameter 'disks'")
7142       for row in self.op.disks:
7143         if (not isinstance(row, dict) or
7144             "size" not in row or
7145             not isinstance(row["size"], int) or
7146             "mode" not in row or
7147             row["mode"] not in ['r', 'w']):
7148           raise errors.OpPrereqError("Invalid contents of the"
7149                                      " 'disks' parameter")
7150       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7151         self.op.hypervisor = self.cfg.GetHypervisorType()
7152     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7153       if not hasattr(self.op, "name"):
7154         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7155       fname = self.cfg.ExpandInstanceName(self.op.name)
7156       if fname is None:
7157         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7158                                    self.op.name)
7159       self.op.name = fname
7160       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7161     else:
7162       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7163                                  self.op.mode)
7164
7165     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7166       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7167         raise errors.OpPrereqError("Missing allocator name")
7168     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7169       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7170                                  self.op.direction)
7171
7172   def Exec(self, feedback_fn):
7173     """Run the allocator test.
7174
7175     """
7176     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7177       ial = IAllocator(self,
7178                        mode=self.op.mode,
7179                        name=self.op.name,
7180                        mem_size=self.op.mem_size,
7181                        disks=self.op.disks,
7182                        disk_template=self.op.disk_template,
7183                        os=self.op.os,
7184                        tags=self.op.tags,
7185                        nics=self.op.nics,
7186                        vcpus=self.op.vcpus,
7187                        hypervisor=self.op.hypervisor,
7188                        )
7189     else:
7190       ial = IAllocator(self,
7191                        mode=self.op.mode,
7192                        name=self.op.name,
7193                        relocate_from=list(self.relocate_from),
7194                        )
7195
7196     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7197       result = ial.in_text
7198     else:
7199       ial.Run(self.op.allocator, validate=False)
7200       result = ial.out_text
7201     return result