Convert node_volumes rpc to new style result
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     msg = result.RemoteFailMsg()
614     if msg:
615       raise errors.OpPrereqError("Error checking bridges on destination node"
616                                  " '%s': %s" % (target_node, msg))
617
618
619 def _CheckInstanceBridgesExist(lu, instance, node=None):
620   """Check that the brigdes needed by an instance exist.
621
622   """
623   if node is None:
624     node=instance.primary_node
625   _CheckNicsBridgesExist(lu, instance.nics, node)
626
627
628 class LUDestroyCluster(NoHooksLU):
629   """Logical unit for destroying the cluster.
630
631   """
632   _OP_REQP = []
633
634   def CheckPrereq(self):
635     """Check prerequisites.
636
637     This checks whether the cluster is empty.
638
639     Any errors are signalled by raising errors.OpPrereqError.
640
641     """
642     master = self.cfg.GetMasterNode()
643
644     nodelist = self.cfg.GetNodeList()
645     if len(nodelist) != 1 or nodelist[0] != master:
646       raise errors.OpPrereqError("There are still %d node(s) in"
647                                  " this cluster." % (len(nodelist) - 1))
648     instancelist = self.cfg.GetInstanceList()
649     if instancelist:
650       raise errors.OpPrereqError("There are still %d instance(s) in"
651                                  " this cluster." % len(instancelist))
652
653   def Exec(self, feedback_fn):
654     """Destroys the cluster.
655
656     """
657     master = self.cfg.GetMasterNode()
658     result = self.rpc.call_node_stop_master(master, False)
659     msg = result.RemoteFailMsg()
660     if msg:
661       raise errors.OpExecError("Could not disable the master role: %s" % msg)
662     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
663     utils.CreateBackup(priv_key)
664     utils.CreateBackup(pub_key)
665     return master
666
667
668 class LUVerifyCluster(LogicalUnit):
669   """Verifies the cluster status.
670
671   """
672   HPATH = "cluster-verify"
673   HTYPE = constants.HTYPE_CLUSTER
674   _OP_REQP = ["skip_checks"]
675   REQ_BGL = False
676
677   def ExpandNames(self):
678     self.needed_locks = {
679       locking.LEVEL_NODE: locking.ALL_SET,
680       locking.LEVEL_INSTANCE: locking.ALL_SET,
681     }
682     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
683
684   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
685                   node_result, feedback_fn, master_files,
686                   drbd_map, vg_name):
687     """Run multiple tests against a node.
688
689     Test list:
690
691       - compares ganeti version
692       - checks vg existance and size > 20G
693       - checks config file checksum
694       - checks ssh to other nodes
695
696     @type nodeinfo: L{objects.Node}
697     @param nodeinfo: the node to check
698     @param file_list: required list of files
699     @param local_cksum: dictionary of local files and their checksums
700     @param node_result: the results from the node
701     @param feedback_fn: function used to accumulate results
702     @param master_files: list of files that only masters should have
703     @param drbd_map: the useddrbd minors for this node, in
704         form of minor: (instance, must_exist) which correspond to instances
705         and their running status
706     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
707
708     """
709     node = nodeinfo.name
710
711     # main result, node_result should be a non-empty dict
712     if not node_result or not isinstance(node_result, dict):
713       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
714       return True
715
716     # compares ganeti version
717     local_version = constants.PROTOCOL_VERSION
718     remote_version = node_result.get('version', None)
719     if not (remote_version and isinstance(remote_version, (list, tuple)) and
720             len(remote_version) == 2):
721       feedback_fn("  - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version[0]:
725       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
726                   " node %s %s" % (local_version, node, remote_version[0]))
727       return True
728
729     # node seems compatible, we can actually try to look into its results
730
731     bad = False
732
733     # full package version
734     if constants.RELEASE_VERSION != remote_version[1]:
735       feedback_fn("  - WARNING: software version mismatch: master %s,"
736                   " node %s %s" %
737                   (constants.RELEASE_VERSION, node, remote_version[1]))
738
739     # checks vg existence and size > 20G
740     if vg_name is not None:
741       vglist = node_result.get(constants.NV_VGLIST, None)
742       if not vglist:
743         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
744                         (node,))
745         bad = True
746       else:
747         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
748                                               constants.MIN_VG_SIZE)
749         if vgstatus:
750           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
751           bad = True
752
753     # checks config file checksum
754
755     remote_cksum = node_result.get(constants.NV_FILELIST, None)
756     if not isinstance(remote_cksum, dict):
757       bad = True
758       feedback_fn("  - ERROR: node hasn't returned file checksum data")
759     else:
760       for file_name in file_list:
761         node_is_mc = nodeinfo.master_candidate
762         must_have_file = file_name not in master_files
763         if file_name not in remote_cksum:
764           if node_is_mc or must_have_file:
765             bad = True
766             feedback_fn("  - ERROR: file '%s' missing" % file_name)
767         elif remote_cksum[file_name] != local_cksum[file_name]:
768           if node_is_mc or must_have_file:
769             bad = True
770             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
771           else:
772             # not candidate and this is not a must-have file
773             bad = True
774             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
775                         " '%s'" % file_name)
776         else:
777           # all good, except non-master/non-must have combination
778           if not node_is_mc and not must_have_file:
779             feedback_fn("  - ERROR: file '%s' should not exist on non master"
780                         " candidates" % file_name)
781
782     # checks ssh to any
783
784     if constants.NV_NODELIST not in node_result:
785       bad = True
786       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
787     else:
788       if node_result[constants.NV_NODELIST]:
789         bad = True
790         for node in node_result[constants.NV_NODELIST]:
791           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
792                           (node, node_result[constants.NV_NODELIST][node]))
793
794     if constants.NV_NODENETTEST not in node_result:
795       bad = True
796       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
797     else:
798       if node_result[constants.NV_NODENETTEST]:
799         bad = True
800         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
801         for node in nlist:
802           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
803                           (node, node_result[constants.NV_NODENETTEST][node]))
804
805     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
806     if isinstance(hyp_result, dict):
807       for hv_name, hv_result in hyp_result.iteritems():
808         if hv_result is not None:
809           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
810                       (hv_name, hv_result))
811
812     # check used drbd list
813     if vg_name is not None:
814       used_minors = node_result.get(constants.NV_DRBDLIST, [])
815       if not isinstance(used_minors, (tuple, list)):
816         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
817                     str(used_minors))
818       else:
819         for minor, (iname, must_exist) in drbd_map.items():
820           if minor not in used_minors and must_exist:
821             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
822                         " not active" % (minor, iname))
823             bad = True
824         for minor in used_minors:
825           if minor not in drbd_map:
826             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
827                         minor)
828             bad = True
829
830     return bad
831
832   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
833                       node_instance, feedback_fn, n_offline):
834     """Verify an instance.
835
836     This function checks to see if the required block devices are
837     available on the instance's node.
838
839     """
840     bad = False
841
842     node_current = instanceconfig.primary_node
843
844     node_vol_should = {}
845     instanceconfig.MapLVsByNode(node_vol_should)
846
847     for node in node_vol_should:
848       if node in n_offline:
849         # ignore missing volumes on offline nodes
850         continue
851       for volume in node_vol_should[node]:
852         if node not in node_vol_is or volume not in node_vol_is[node]:
853           feedback_fn("  - ERROR: volume %s missing on node %s" %
854                           (volume, node))
855           bad = True
856
857     if instanceconfig.admin_up:
858       if ((node_current not in node_instance or
859           not instance in node_instance[node_current]) and
860           node_current not in n_offline):
861         feedback_fn("  - ERROR: instance %s not running on node %s" %
862                         (instance, node_current))
863         bad = True
864
865     for node in node_instance:
866       if (not node == node_current):
867         if instance in node_instance[node]:
868           feedback_fn("  - ERROR: instance %s should not run on node %s" %
869                           (instance, node))
870           bad = True
871
872     return bad
873
874   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
875     """Verify if there are any unknown volumes in the cluster.
876
877     The .os, .swap and backup volumes are ignored. All other volumes are
878     reported as unknown.
879
880     """
881     bad = False
882
883     for node in node_vol_is:
884       for volume in node_vol_is[node]:
885         if node not in node_vol_should or volume not in node_vol_should[node]:
886           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
887                       (volume, node))
888           bad = True
889     return bad
890
891   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
892     """Verify the list of running instances.
893
894     This checks what instances are running but unknown to the cluster.
895
896     """
897     bad = False
898     for node in node_instance:
899       for runninginstance in node_instance[node]:
900         if runninginstance not in instancelist:
901           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
902                           (runninginstance, node))
903           bad = True
904     return bad
905
906   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
907     """Verify N+1 Memory Resilience.
908
909     Check that if one single node dies we can still start all the instances it
910     was primary for.
911
912     """
913     bad = False
914
915     for node, nodeinfo in node_info.iteritems():
916       # This code checks that every node which is now listed as secondary has
917       # enough memory to host all instances it is supposed to should a single
918       # other node in the cluster fail.
919       # FIXME: not ready for failover to an arbitrary node
920       # FIXME: does not support file-backed instances
921       # WARNING: we currently take into account down instances as well as up
922       # ones, considering that even if they're down someone might want to start
923       # them even in the event of a node failure.
924       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
925         needed_mem = 0
926         for instance in instances:
927           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
928           if bep[constants.BE_AUTO_BALANCE]:
929             needed_mem += bep[constants.BE_MEMORY]
930         if nodeinfo['mfree'] < needed_mem:
931           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
932                       " failovers should node %s fail" % (node, prinode))
933           bad = True
934     return bad
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     Transform the list of checks we're going to skip into a set and check that
940     all its members are valid.
941
942     """
943     self.skip_set = frozenset(self.op.skip_checks)
944     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
945       raise errors.OpPrereqError("Invalid checks to be skipped specified")
946
947   def BuildHooksEnv(self):
948     """Build hooks env.
949
950     Cluster-Verify hooks just rone in the post phase and their failure makes
951     the output be logged in the verify output and the verification to fail.
952
953     """
954     all_nodes = self.cfg.GetNodeList()
955     env = {
956       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
957       }
958     for node in self.cfg.GetAllNodesInfo().values():
959       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
960
961     return env, [], all_nodes
962
963   def Exec(self, feedback_fn):
964     """Verify integrity of cluster, performing various test on nodes.
965
966     """
967     bad = False
968     feedback_fn("* Verifying global settings")
969     for msg in self.cfg.VerifyConfig():
970       feedback_fn("  - ERROR: %s" % msg)
971
972     vg_name = self.cfg.GetVGName()
973     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
974     nodelist = utils.NiceSort(self.cfg.GetNodeList())
975     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
976     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
977     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
978                         for iname in instancelist)
979     i_non_redundant = [] # Non redundant instances
980     i_non_a_balanced = [] # Non auto-balanced instances
981     n_offline = [] # List of offline nodes
982     n_drained = [] # List of nodes being drained
983     node_volume = {}
984     node_instance = {}
985     node_info = {}
986     instance_cfg = {}
987
988     # FIXME: verify OS list
989     # do local checksums
990     master_files = [constants.CLUSTER_CONF_FILE]
991
992     file_names = ssconf.SimpleStore().GetFileList()
993     file_names.append(constants.SSL_CERT_FILE)
994     file_names.append(constants.RAPI_CERT_FILE)
995     file_names.extend(master_files)
996
997     local_checksums = utils.FingerprintFiles(file_names)
998
999     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1000     node_verify_param = {
1001       constants.NV_FILELIST: file_names,
1002       constants.NV_NODELIST: [node.name for node in nodeinfo
1003                               if not node.offline],
1004       constants.NV_HYPERVISOR: hypervisors,
1005       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1006                                   node.secondary_ip) for node in nodeinfo
1007                                  if not node.offline],
1008       constants.NV_INSTANCELIST: hypervisors,
1009       constants.NV_VERSION: None,
1010       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1011       }
1012     if vg_name is not None:
1013       node_verify_param[constants.NV_VGLIST] = None
1014       node_verify_param[constants.NV_LVLIST] = vg_name
1015       node_verify_param[constants.NV_DRBDLIST] = None
1016     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1017                                            self.cfg.GetClusterName())
1018
1019     cluster = self.cfg.GetClusterInfo()
1020     master_node = self.cfg.GetMasterNode()
1021     all_drbd_map = self.cfg.ComputeDRBDMap()
1022
1023     for node_i in nodeinfo:
1024       node = node_i.name
1025
1026       if node_i.offline:
1027         feedback_fn("* Skipping offline node %s" % (node,))
1028         n_offline.append(node)
1029         continue
1030
1031       if node == master_node:
1032         ntype = "master"
1033       elif node_i.master_candidate:
1034         ntype = "master candidate"
1035       elif node_i.drained:
1036         ntype = "drained"
1037         n_drained.append(node)
1038       else:
1039         ntype = "regular"
1040       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1041
1042       msg = all_nvinfo[node].RemoteFailMsg()
1043       if msg:
1044         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1045         bad = True
1046         continue
1047
1048       nresult = all_nvinfo[node].payload
1049       node_drbd = {}
1050       for minor, instance in all_drbd_map[node].items():
1051         if instance not in instanceinfo:
1052           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053                       instance)
1054           # ghost instance should not be running, but otherwise we
1055           # don't give double warnings (both ghost instance and
1056           # unallocated minor in use)
1057           node_drbd[minor] = (instance, False)
1058         else:
1059           instance = instanceinfo[instance]
1060           node_drbd[minor] = (instance.name, instance.admin_up)
1061       result = self._VerifyNode(node_i, file_names, local_checksums,
1062                                 nresult, feedback_fn, master_files,
1063                                 node_drbd, vg_name)
1064       bad = bad or result
1065
1066       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067       if vg_name is None:
1068         node_volume[node] = {}
1069       elif isinstance(lvdata, basestring):
1070         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071                     (node, utils.SafeEncode(lvdata)))
1072         bad = True
1073         node_volume[node] = {}
1074       elif not isinstance(lvdata, dict):
1075         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076         bad = True
1077         continue
1078       else:
1079         node_volume[node] = lvdata
1080
1081       # node_instance
1082       idata = nresult.get(constants.NV_INSTANCELIST, None)
1083       if not isinstance(idata, list):
1084         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085                     (node,))
1086         bad = True
1087         continue
1088
1089       node_instance[node] = idata
1090
1091       # node_info
1092       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093       if not isinstance(nodeinfo, dict):
1094         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095         bad = True
1096         continue
1097
1098       try:
1099         node_info[node] = {
1100           "mfree": int(nodeinfo['memory_free']),
1101           "pinst": [],
1102           "sinst": [],
1103           # dictionary holding all instances this node is secondary for,
1104           # grouped by their primary node. Each key is a cluster node, and each
1105           # value is a list of instances which have the key as primary and the
1106           # current node as secondary.  this is handy to calculate N+1 memory
1107           # availability if you can only failover from a primary to its
1108           # secondary.
1109           "sinst-by-pnode": {},
1110         }
1111         # FIXME: devise a free space model for file based instances as well
1112         if vg_name is not None:
1113           if (constants.NV_VGLIST not in nresult or
1114               vg_name not in nresult[constants.NV_VGLIST]):
1115             feedback_fn("  - ERROR: node %s didn't return data for the"
1116                         " volume group '%s' - it is either missing or broken" %
1117                         (node, vg_name))
1118             bad = True
1119             continue
1120           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121       except (ValueError, KeyError):
1122         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123                     " from node %s" % (node,))
1124         bad = True
1125         continue
1126
1127     node_vol_should = {}
1128
1129     for instance in instancelist:
1130       feedback_fn("* Verifying instance %s" % instance)
1131       inst_config = instanceinfo[instance]
1132       result =  self._VerifyInstance(instance, inst_config, node_volume,
1133                                      node_instance, feedback_fn, n_offline)
1134       bad = bad or result
1135       inst_nodes_offline = []
1136
1137       inst_config.MapLVsByNode(node_vol_should)
1138
1139       instance_cfg[instance] = inst_config
1140
1141       pnode = inst_config.primary_node
1142       if pnode in node_info:
1143         node_info[pnode]['pinst'].append(instance)
1144       elif pnode not in n_offline:
1145         feedback_fn("  - ERROR: instance %s, connection to primary node"
1146                     " %s failed" % (instance, pnode))
1147         bad = True
1148
1149       if pnode in n_offline:
1150         inst_nodes_offline.append(pnode)
1151
1152       # If the instance is non-redundant we cannot survive losing its primary
1153       # node, so we are not N+1 compliant. On the other hand we have no disk
1154       # templates with more than one secondary so that situation is not well
1155       # supported either.
1156       # FIXME: does not support file-backed instances
1157       if len(inst_config.secondary_nodes) == 0:
1158         i_non_redundant.append(instance)
1159       elif len(inst_config.secondary_nodes) > 1:
1160         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161                     % instance)
1162
1163       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164         i_non_a_balanced.append(instance)
1165
1166       for snode in inst_config.secondary_nodes:
1167         if snode in node_info:
1168           node_info[snode]['sinst'].append(instance)
1169           if pnode not in node_info[snode]['sinst-by-pnode']:
1170             node_info[snode]['sinst-by-pnode'][pnode] = []
1171           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172         elif snode not in n_offline:
1173           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174                       " %s failed" % (instance, snode))
1175           bad = True
1176         if snode in n_offline:
1177           inst_nodes_offline.append(snode)
1178
1179       if inst_nodes_offline:
1180         # warn that the instance lives on offline nodes, and set bad=True
1181         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182                     ", ".join(inst_nodes_offline))
1183         bad = True
1184
1185     feedback_fn("* Verifying orphan volumes")
1186     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187                                        feedback_fn)
1188     bad = bad or result
1189
1190     feedback_fn("* Verifying remaining instances")
1191     result = self._VerifyOrphanInstances(instancelist, node_instance,
1192                                          feedback_fn)
1193     bad = bad or result
1194
1195     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196       feedback_fn("* Verifying N+1 Memory redundancy")
1197       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198       bad = bad or result
1199
1200     feedback_fn("* Other Notes")
1201     if i_non_redundant:
1202       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203                   % len(i_non_redundant))
1204
1205     if i_non_a_balanced:
1206       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207                   % len(i_non_a_balanced))
1208
1209     if n_offline:
1210       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211
1212     if n_drained:
1213       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214
1215     return not bad
1216
1217   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218     """Analize the post-hooks' result
1219
1220     This method analyses the hook result, handles it, and sends some
1221     nicely-formatted feedback back to the user.
1222
1223     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225     @param hooks_results: the results of the multi-node hooks rpc call
1226     @param feedback_fn: function used send feedback back to the caller
1227     @param lu_result: previous Exec result
1228     @return: the new Exec result, based on the previous result
1229         and hook results
1230
1231     """
1232     # We only really run POST phase hooks, and are only interested in
1233     # their results
1234     if phase == constants.HOOKS_PHASE_POST:
1235       # Used to change hooks' output to proper indentation
1236       indent_re = re.compile('^', re.M)
1237       feedback_fn("* Hooks Results")
1238       if not hooks_results:
1239         feedback_fn("  - ERROR: general communication failure")
1240         lu_result = 1
1241       else:
1242         for node_name in hooks_results:
1243           show_node_header = True
1244           res = hooks_results[node_name]
1245           if res.failed or res.data is False or not isinstance(res.data, list):
1246             if res.offline:
1247               # no need to warn or set fail return value
1248               continue
1249             feedback_fn("    Communication failure in hooks execution")
1250             lu_result = 1
1251             continue
1252           for script, hkr, output in res.data:
1253             if hkr == constants.HKR_FAIL:
1254               # The node header is only shown once, if there are
1255               # failing hooks on that node
1256               if show_node_header:
1257                 feedback_fn("  Node %s:" % node_name)
1258                 show_node_header = False
1259               feedback_fn("    ERROR: Script %s failed, output:" % script)
1260               output = indent_re.sub('      ', output)
1261               feedback_fn("%s" % output)
1262               lu_result = 1
1263
1264       return lu_result
1265
1266
1267 class LUVerifyDisks(NoHooksLU):
1268   """Verifies the cluster disks status.
1269
1270   """
1271   _OP_REQP = []
1272   REQ_BGL = False
1273
1274   def ExpandNames(self):
1275     self.needed_locks = {
1276       locking.LEVEL_NODE: locking.ALL_SET,
1277       locking.LEVEL_INSTANCE: locking.ALL_SET,
1278     }
1279     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1280
1281   def CheckPrereq(self):
1282     """Check prerequisites.
1283
1284     This has no prerequisites.
1285
1286     """
1287     pass
1288
1289   def Exec(self, feedback_fn):
1290     """Verify integrity of cluster disks.
1291
1292     @rtype: tuple of three items
1293     @return: a tuple of (dict of node-to-node_error, list of instances
1294         which need activate-disks, dict of instance: (node, volume) for
1295         missing volumes
1296
1297     """
1298     result = res_nodes, res_instances, res_missing = {}, [], {}
1299
1300     vg_name = self.cfg.GetVGName()
1301     nodes = utils.NiceSort(self.cfg.GetNodeList())
1302     instances = [self.cfg.GetInstanceInfo(name)
1303                  for name in self.cfg.GetInstanceList()]
1304
1305     nv_dict = {}
1306     for inst in instances:
1307       inst_lvs = {}
1308       if (not inst.admin_up or
1309           inst.disk_template not in constants.DTS_NET_MIRROR):
1310         continue
1311       inst.MapLVsByNode(inst_lvs)
1312       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1313       for node, vol_list in inst_lvs.iteritems():
1314         for vol in vol_list:
1315           nv_dict[(node, vol)] = inst
1316
1317     if not nv_dict:
1318       return result
1319
1320     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1321
1322     to_act = set()
1323     for node in nodes:
1324       # node_volume
1325       node_res = node_lvs[node]
1326       if node_res.offline:
1327         continue
1328       msg = node_res.RemoteFailMsg()
1329       if msg:
1330         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1331         res_nodes[node] = msg
1332         continue
1333
1334       lvs = node_res.payload
1335       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1336         inst = nv_dict.pop((node, lv_name), None)
1337         if (not lv_online and inst is not None
1338             and inst.name not in res_instances):
1339           res_instances.append(inst.name)
1340
1341     # any leftover items in nv_dict are missing LVs, let's arrange the
1342     # data better
1343     for key, inst in nv_dict.iteritems():
1344       if inst.name not in res_missing:
1345         res_missing[inst.name] = []
1346       res_missing[inst.name].append(key)
1347
1348     return result
1349
1350
1351 class LURenameCluster(LogicalUnit):
1352   """Rename the cluster.
1353
1354   """
1355   HPATH = "cluster-rename"
1356   HTYPE = constants.HTYPE_CLUSTER
1357   _OP_REQP = ["name"]
1358
1359   def BuildHooksEnv(self):
1360     """Build hooks env.
1361
1362     """
1363     env = {
1364       "OP_TARGET": self.cfg.GetClusterName(),
1365       "NEW_NAME": self.op.name,
1366       }
1367     mn = self.cfg.GetMasterNode()
1368     return env, [mn], [mn]
1369
1370   def CheckPrereq(self):
1371     """Verify that the passed name is a valid one.
1372
1373     """
1374     hostname = utils.HostInfo(self.op.name)
1375
1376     new_name = hostname.name
1377     self.ip = new_ip = hostname.ip
1378     old_name = self.cfg.GetClusterName()
1379     old_ip = self.cfg.GetMasterIP()
1380     if new_name == old_name and new_ip == old_ip:
1381       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1382                                  " cluster has changed")
1383     if new_ip != old_ip:
1384       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1385         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1386                                    " reachable on the network. Aborting." %
1387                                    new_ip)
1388
1389     self.op.name = new_name
1390
1391   def Exec(self, feedback_fn):
1392     """Rename the cluster.
1393
1394     """
1395     clustername = self.op.name
1396     ip = self.ip
1397
1398     # shutdown the master IP
1399     master = self.cfg.GetMasterNode()
1400     result = self.rpc.call_node_stop_master(master, False)
1401     msg = result.RemoteFailMsg()
1402     if msg:
1403       raise errors.OpExecError("Could not disable the master role: %s" % msg)
1404
1405     try:
1406       cluster = self.cfg.GetClusterInfo()
1407       cluster.cluster_name = clustername
1408       cluster.master_ip = ip
1409       self.cfg.Update(cluster)
1410
1411       # update the known hosts file
1412       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1413       node_list = self.cfg.GetNodeList()
1414       try:
1415         node_list.remove(master)
1416       except ValueError:
1417         pass
1418       result = self.rpc.call_upload_file(node_list,
1419                                          constants.SSH_KNOWN_HOSTS_FILE)
1420       for to_node, to_result in result.iteritems():
1421          msg = to_result.RemoteFailMsg()
1422          if msg:
1423            msg = ("Copy of file %s to node %s failed: %s" %
1424                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1425            self.proc.LogWarning(msg)
1426
1427     finally:
1428       result = self.rpc.call_node_start_master(master, False)
1429       msg = result.RemoteFailMsg()
1430       if msg:
1431         self.LogWarning("Could not re-enable the master role on"
1432                         " the master, please restart manually: %s", msg)
1433
1434
1435 def _RecursiveCheckIfLVMBased(disk):
1436   """Check if the given disk or its children are lvm-based.
1437
1438   @type disk: L{objects.Disk}
1439   @param disk: the disk to check
1440   @rtype: booleean
1441   @return: boolean indicating whether a LD_LV dev_type was found or not
1442
1443   """
1444   if disk.children:
1445     for chdisk in disk.children:
1446       if _RecursiveCheckIfLVMBased(chdisk):
1447         return True
1448   return disk.dev_type == constants.LD_LV
1449
1450
1451 class LUSetClusterParams(LogicalUnit):
1452   """Change the parameters of the cluster.
1453
1454   """
1455   HPATH = "cluster-modify"
1456   HTYPE = constants.HTYPE_CLUSTER
1457   _OP_REQP = []
1458   REQ_BGL = False
1459
1460   def CheckArguments(self):
1461     """Check parameters
1462
1463     """
1464     if not hasattr(self.op, "candidate_pool_size"):
1465       self.op.candidate_pool_size = None
1466     if self.op.candidate_pool_size is not None:
1467       try:
1468         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1469       except (ValueError, TypeError), err:
1470         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1471                                    str(err))
1472       if self.op.candidate_pool_size < 1:
1473         raise errors.OpPrereqError("At least one master candidate needed")
1474
1475   def ExpandNames(self):
1476     # FIXME: in the future maybe other cluster params won't require checking on
1477     # all nodes to be modified.
1478     self.needed_locks = {
1479       locking.LEVEL_NODE: locking.ALL_SET,
1480     }
1481     self.share_locks[locking.LEVEL_NODE] = 1
1482
1483   def BuildHooksEnv(self):
1484     """Build hooks env.
1485
1486     """
1487     env = {
1488       "OP_TARGET": self.cfg.GetClusterName(),
1489       "NEW_VG_NAME": self.op.vg_name,
1490       }
1491     mn = self.cfg.GetMasterNode()
1492     return env, [mn], [mn]
1493
1494   def CheckPrereq(self):
1495     """Check prerequisites.
1496
1497     This checks whether the given params don't conflict and
1498     if the given volume group is valid.
1499
1500     """
1501     if self.op.vg_name is not None and not self.op.vg_name:
1502       instances = self.cfg.GetAllInstancesInfo().values()
1503       for inst in instances:
1504         for disk in inst.disks:
1505           if _RecursiveCheckIfLVMBased(disk):
1506             raise errors.OpPrereqError("Cannot disable lvm storage while"
1507                                        " lvm-based instances exist")
1508
1509     node_list = self.acquired_locks[locking.LEVEL_NODE]
1510
1511     # if vg_name not None, checks given volume group on all nodes
1512     if self.op.vg_name:
1513       vglist = self.rpc.call_vg_list(node_list)
1514       for node in node_list:
1515         msg = vglist[node].RemoteFailMsg()
1516         if msg:
1517           # ignoring down node
1518           self.LogWarning("Error while gathering data on node %s"
1519                           " (ignoring node): %s", node, msg)
1520           continue
1521         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1522                                               self.op.vg_name,
1523                                               constants.MIN_VG_SIZE)
1524         if vgstatus:
1525           raise errors.OpPrereqError("Error on node '%s': %s" %
1526                                      (node, vgstatus))
1527
1528     self.cluster = cluster = self.cfg.GetClusterInfo()
1529     # validate params changes
1530     if self.op.beparams:
1531       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1532       self.new_beparams = objects.FillDict(
1533         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1534
1535     if self.op.nicparams:
1536       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1537       self.new_nicparams = objects.FillDict(
1538         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1539       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1540
1541     # hypervisor list/parameters
1542     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1543     if self.op.hvparams:
1544       if not isinstance(self.op.hvparams, dict):
1545         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1546       for hv_name, hv_dict in self.op.hvparams.items():
1547         if hv_name not in self.new_hvparams:
1548           self.new_hvparams[hv_name] = hv_dict
1549         else:
1550           self.new_hvparams[hv_name].update(hv_dict)
1551
1552     if self.op.enabled_hypervisors is not None:
1553       self.hv_list = self.op.enabled_hypervisors
1554     else:
1555       self.hv_list = cluster.enabled_hypervisors
1556
1557     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1558       # either the enabled list has changed, or the parameters have, validate
1559       for hv_name, hv_params in self.new_hvparams.items():
1560         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1561             (self.op.enabled_hypervisors and
1562              hv_name in self.op.enabled_hypervisors)):
1563           # either this is a new hypervisor, or its parameters have changed
1564           hv_class = hypervisor.GetHypervisor(hv_name)
1565           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1566           hv_class.CheckParameterSyntax(hv_params)
1567           _CheckHVParams(self, node_list, hv_name, hv_params)
1568
1569   def Exec(self, feedback_fn):
1570     """Change the parameters of the cluster.
1571
1572     """
1573     if self.op.vg_name is not None:
1574       new_volume = self.op.vg_name
1575       if not new_volume:
1576         new_volume = None
1577       if new_volume != self.cfg.GetVGName():
1578         self.cfg.SetVGName(new_volume)
1579       else:
1580         feedback_fn("Cluster LVM configuration already in desired"
1581                     " state, not changing")
1582     if self.op.hvparams:
1583       self.cluster.hvparams = self.new_hvparams
1584     if self.op.enabled_hypervisors is not None:
1585       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1586     if self.op.beparams:
1587       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1588     if self.op.nicparams:
1589       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1590
1591     if self.op.candidate_pool_size is not None:
1592       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1593
1594     self.cfg.Update(self.cluster)
1595
1596     # we want to update nodes after the cluster so that if any errors
1597     # happen, we have recorded and saved the cluster info
1598     if self.op.candidate_pool_size is not None:
1599       _AdjustCandidatePool(self)
1600
1601
1602 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1603   """Distribute additional files which are part of the cluster configuration.
1604
1605   ConfigWriter takes care of distributing the config and ssconf files, but
1606   there are more files which should be distributed to all nodes. This function
1607   makes sure those are copied.
1608
1609   @param lu: calling logical unit
1610   @param additional_nodes: list of nodes not in the config to distribute to
1611
1612   """
1613   # 1. Gather target nodes
1614   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1615   dist_nodes = lu.cfg.GetNodeList()
1616   if additional_nodes is not None:
1617     dist_nodes.extend(additional_nodes)
1618   if myself.name in dist_nodes:
1619     dist_nodes.remove(myself.name)
1620   # 2. Gather files to distribute
1621   dist_files = set([constants.ETC_HOSTS,
1622                     constants.SSH_KNOWN_HOSTS_FILE,
1623                     constants.RAPI_CERT_FILE,
1624                     constants.RAPI_USERS_FILE,
1625                    ])
1626
1627   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1628   for hv_name in enabled_hypervisors:
1629     hv_class = hypervisor.GetHypervisor(hv_name)
1630     dist_files.update(hv_class.GetAncillaryFiles())
1631
1632   # 3. Perform the files upload
1633   for fname in dist_files:
1634     if os.path.exists(fname):
1635       result = lu.rpc.call_upload_file(dist_nodes, fname)
1636       for to_node, to_result in result.items():
1637          msg = to_result.RemoteFailMsg()
1638          if msg:
1639            msg = ("Copy of file %s to node %s failed: %s" %
1640                    (fname, to_node, msg))
1641            lu.proc.LogWarning(msg)
1642
1643
1644 class LURedistributeConfig(NoHooksLU):
1645   """Force the redistribution of cluster configuration.
1646
1647   This is a very simple LU.
1648
1649   """
1650   _OP_REQP = []
1651   REQ_BGL = False
1652
1653   def ExpandNames(self):
1654     self.needed_locks = {
1655       locking.LEVEL_NODE: locking.ALL_SET,
1656     }
1657     self.share_locks[locking.LEVEL_NODE] = 1
1658
1659   def CheckPrereq(self):
1660     """Check prerequisites.
1661
1662     """
1663
1664   def Exec(self, feedback_fn):
1665     """Redistribute the configuration.
1666
1667     """
1668     self.cfg.Update(self.cfg.GetClusterInfo())
1669     _RedistributeAncillaryFiles(self)
1670
1671
1672 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1673   """Sleep and poll for an instance's disk to sync.
1674
1675   """
1676   if not instance.disks:
1677     return True
1678
1679   if not oneshot:
1680     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1681
1682   node = instance.primary_node
1683
1684   for dev in instance.disks:
1685     lu.cfg.SetDiskID(dev, node)
1686
1687   retries = 0
1688   while True:
1689     max_time = 0
1690     done = True
1691     cumul_degraded = False
1692     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1693     msg = rstats.RemoteFailMsg()
1694     if msg:
1695       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1696       retries += 1
1697       if retries >= 10:
1698         raise errors.RemoteError("Can't contact node %s for mirror data,"
1699                                  " aborting." % node)
1700       time.sleep(6)
1701       continue
1702     rstats = rstats.payload
1703     retries = 0
1704     for i, mstat in enumerate(rstats):
1705       if mstat is None:
1706         lu.LogWarning("Can't compute data for node %s/%s",
1707                            node, instance.disks[i].iv_name)
1708         continue
1709       # we ignore the ldisk parameter
1710       perc_done, est_time, is_degraded, _ = mstat
1711       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1712       if perc_done is not None:
1713         done = False
1714         if est_time is not None:
1715           rem_time = "%d estimated seconds remaining" % est_time
1716           max_time = est_time
1717         else:
1718           rem_time = "no time estimate"
1719         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1720                         (instance.disks[i].iv_name, perc_done, rem_time))
1721     if done or oneshot:
1722       break
1723
1724     time.sleep(min(60, max_time))
1725
1726   if done:
1727     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1728   return not cumul_degraded
1729
1730
1731 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1732   """Check that mirrors are not degraded.
1733
1734   The ldisk parameter, if True, will change the test from the
1735   is_degraded attribute (which represents overall non-ok status for
1736   the device(s)) to the ldisk (representing the local storage status).
1737
1738   """
1739   lu.cfg.SetDiskID(dev, node)
1740   if ldisk:
1741     idx = 6
1742   else:
1743     idx = 5
1744
1745   result = True
1746   if on_primary or dev.AssembleOnSecondary():
1747     rstats = lu.rpc.call_blockdev_find(node, dev)
1748     msg = rstats.RemoteFailMsg()
1749     if msg:
1750       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1751       result = False
1752     elif not rstats.payload:
1753       lu.LogWarning("Can't find disk on node %s", node)
1754       result = False
1755     else:
1756       result = result and (not rstats.payload[idx])
1757   if dev.children:
1758     for child in dev.children:
1759       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1760
1761   return result
1762
1763
1764 class LUDiagnoseOS(NoHooksLU):
1765   """Logical unit for OS diagnose/query.
1766
1767   """
1768   _OP_REQP = ["output_fields", "names"]
1769   REQ_BGL = False
1770   _FIELDS_STATIC = utils.FieldSet()
1771   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1772
1773   def ExpandNames(self):
1774     if self.op.names:
1775       raise errors.OpPrereqError("Selective OS query not supported")
1776
1777     _CheckOutputFields(static=self._FIELDS_STATIC,
1778                        dynamic=self._FIELDS_DYNAMIC,
1779                        selected=self.op.output_fields)
1780
1781     # Lock all nodes, in shared mode
1782     # Temporary removal of locks, should be reverted later
1783     # TODO: reintroduce locks when they are lighter-weight
1784     self.needed_locks = {}
1785     #self.share_locks[locking.LEVEL_NODE] = 1
1786     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1787
1788   def CheckPrereq(self):
1789     """Check prerequisites.
1790
1791     """
1792
1793   @staticmethod
1794   def _DiagnoseByOS(node_list, rlist):
1795     """Remaps a per-node return list into an a per-os per-node dictionary
1796
1797     @param node_list: a list with the names of all nodes
1798     @param rlist: a map with node names as keys and OS objects as values
1799
1800     @rtype: dict
1801     @return: a dictionary with osnames as keys and as value another map, with
1802         nodes as keys and list of OS objects as values, eg::
1803
1804           {"debian-etch": {"node1": [<object>,...],
1805                            "node2": [<object>,]}
1806           }
1807
1808     """
1809     all_os = {}
1810     # we build here the list of nodes that didn't fail the RPC (at RPC
1811     # level), so that nodes with a non-responding node daemon don't
1812     # make all OSes invalid
1813     good_nodes = [node_name for node_name in rlist
1814                   if not rlist[node_name].failed]
1815     for node_name, nr in rlist.iteritems():
1816       if nr.failed or not nr.data:
1817         continue
1818       for os_obj in nr.data:
1819         if os_obj.name not in all_os:
1820           # build a list of nodes for this os containing empty lists
1821           # for each node in node_list
1822           all_os[os_obj.name] = {}
1823           for nname in good_nodes:
1824             all_os[os_obj.name][nname] = []
1825         all_os[os_obj.name][node_name].append(os_obj)
1826     return all_os
1827
1828   def Exec(self, feedback_fn):
1829     """Compute the list of OSes.
1830
1831     """
1832     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1833     node_data = self.rpc.call_os_diagnose(valid_nodes)
1834     if node_data == False:
1835       raise errors.OpExecError("Can't gather the list of OSes")
1836     pol = self._DiagnoseByOS(valid_nodes, node_data)
1837     output = []
1838     for os_name, os_data in pol.iteritems():
1839       row = []
1840       for field in self.op.output_fields:
1841         if field == "name":
1842           val = os_name
1843         elif field == "valid":
1844           val = utils.all([osl and osl[0] for osl in os_data.values()])
1845         elif field == "node_status":
1846           val = {}
1847           for node_name, nos_list in os_data.iteritems():
1848             val[node_name] = [(v.status, v.path) for v in nos_list]
1849         else:
1850           raise errors.ParameterError(field)
1851         row.append(val)
1852       output.append(row)
1853
1854     return output
1855
1856
1857 class LURemoveNode(LogicalUnit):
1858   """Logical unit for removing a node.
1859
1860   """
1861   HPATH = "node-remove"
1862   HTYPE = constants.HTYPE_NODE
1863   _OP_REQP = ["node_name"]
1864
1865   def BuildHooksEnv(self):
1866     """Build hooks env.
1867
1868     This doesn't run on the target node in the pre phase as a failed
1869     node would then be impossible to remove.
1870
1871     """
1872     env = {
1873       "OP_TARGET": self.op.node_name,
1874       "NODE_NAME": self.op.node_name,
1875       }
1876     all_nodes = self.cfg.GetNodeList()
1877     all_nodes.remove(self.op.node_name)
1878     return env, all_nodes, all_nodes
1879
1880   def CheckPrereq(self):
1881     """Check prerequisites.
1882
1883     This checks:
1884      - the node exists in the configuration
1885      - it does not have primary or secondary instances
1886      - it's not the master
1887
1888     Any errors are signalled by raising errors.OpPrereqError.
1889
1890     """
1891     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1892     if node is None:
1893       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1894
1895     instance_list = self.cfg.GetInstanceList()
1896
1897     masternode = self.cfg.GetMasterNode()
1898     if node.name == masternode:
1899       raise errors.OpPrereqError("Node is the master node,"
1900                                  " you need to failover first.")
1901
1902     for instance_name in instance_list:
1903       instance = self.cfg.GetInstanceInfo(instance_name)
1904       if node.name in instance.all_nodes:
1905         raise errors.OpPrereqError("Instance %s is still running on the node,"
1906                                    " please remove first." % instance_name)
1907     self.op.node_name = node.name
1908     self.node = node
1909
1910   def Exec(self, feedback_fn):
1911     """Removes the node from the cluster.
1912
1913     """
1914     node = self.node
1915     logging.info("Stopping the node daemon and removing configs from node %s",
1916                  node.name)
1917
1918     self.context.RemoveNode(node.name)
1919
1920     result = self.rpc.call_node_leave_cluster(node.name)
1921     msg = result.RemoteFailMsg()
1922     if msg:
1923       self.LogWarning("Errors encountered on the remote node while leaving"
1924                       " the cluster: %s", msg)
1925
1926     # Promote nodes to master candidate as needed
1927     _AdjustCandidatePool(self)
1928
1929
1930 class LUQueryNodes(NoHooksLU):
1931   """Logical unit for querying nodes.
1932
1933   """
1934   _OP_REQP = ["output_fields", "names", "use_locking"]
1935   REQ_BGL = False
1936   _FIELDS_DYNAMIC = utils.FieldSet(
1937     "dtotal", "dfree",
1938     "mtotal", "mnode", "mfree",
1939     "bootid",
1940     "ctotal", "cnodes", "csockets",
1941     )
1942
1943   _FIELDS_STATIC = utils.FieldSet(
1944     "name", "pinst_cnt", "sinst_cnt",
1945     "pinst_list", "sinst_list",
1946     "pip", "sip", "tags",
1947     "serial_no",
1948     "master_candidate",
1949     "master",
1950     "offline",
1951     "drained",
1952     )
1953
1954   def ExpandNames(self):
1955     _CheckOutputFields(static=self._FIELDS_STATIC,
1956                        dynamic=self._FIELDS_DYNAMIC,
1957                        selected=self.op.output_fields)
1958
1959     self.needed_locks = {}
1960     self.share_locks[locking.LEVEL_NODE] = 1
1961
1962     if self.op.names:
1963       self.wanted = _GetWantedNodes(self, self.op.names)
1964     else:
1965       self.wanted = locking.ALL_SET
1966
1967     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1968     self.do_locking = self.do_node_query and self.op.use_locking
1969     if self.do_locking:
1970       # if we don't request only static fields, we need to lock the nodes
1971       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1972
1973
1974   def CheckPrereq(self):
1975     """Check prerequisites.
1976
1977     """
1978     # The validation of the node list is done in the _GetWantedNodes,
1979     # if non empty, and if empty, there's no validation to do
1980     pass
1981
1982   def Exec(self, feedback_fn):
1983     """Computes the list of nodes and their attributes.
1984
1985     """
1986     all_info = self.cfg.GetAllNodesInfo()
1987     if self.do_locking:
1988       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1989     elif self.wanted != locking.ALL_SET:
1990       nodenames = self.wanted
1991       missing = set(nodenames).difference(all_info.keys())
1992       if missing:
1993         raise errors.OpExecError(
1994           "Some nodes were removed before retrieving their data: %s" % missing)
1995     else:
1996       nodenames = all_info.keys()
1997
1998     nodenames = utils.NiceSort(nodenames)
1999     nodelist = [all_info[name] for name in nodenames]
2000
2001     # begin data gathering
2002
2003     if self.do_node_query:
2004       live_data = {}
2005       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2006                                           self.cfg.GetHypervisorType())
2007       for name in nodenames:
2008         nodeinfo = node_data[name]
2009         if not nodeinfo.RemoteFailMsg() and nodeinfo.payload:
2010           nodeinfo = nodeinfo.payload
2011           fn = utils.TryConvert
2012           live_data[name] = {
2013             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2014             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2015             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2016             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2017             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2018             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2019             "bootid": nodeinfo.get('bootid', None),
2020             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2021             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2022             }
2023         else:
2024           live_data[name] = {}
2025     else:
2026       live_data = dict.fromkeys(nodenames, {})
2027
2028     node_to_primary = dict([(name, set()) for name in nodenames])
2029     node_to_secondary = dict([(name, set()) for name in nodenames])
2030
2031     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2032                              "sinst_cnt", "sinst_list"))
2033     if inst_fields & frozenset(self.op.output_fields):
2034       instancelist = self.cfg.GetInstanceList()
2035
2036       for instance_name in instancelist:
2037         inst = self.cfg.GetInstanceInfo(instance_name)
2038         if inst.primary_node in node_to_primary:
2039           node_to_primary[inst.primary_node].add(inst.name)
2040         for secnode in inst.secondary_nodes:
2041           if secnode in node_to_secondary:
2042             node_to_secondary[secnode].add(inst.name)
2043
2044     master_node = self.cfg.GetMasterNode()
2045
2046     # end data gathering
2047
2048     output = []
2049     for node in nodelist:
2050       node_output = []
2051       for field in self.op.output_fields:
2052         if field == "name":
2053           val = node.name
2054         elif field == "pinst_list":
2055           val = list(node_to_primary[node.name])
2056         elif field == "sinst_list":
2057           val = list(node_to_secondary[node.name])
2058         elif field == "pinst_cnt":
2059           val = len(node_to_primary[node.name])
2060         elif field == "sinst_cnt":
2061           val = len(node_to_secondary[node.name])
2062         elif field == "pip":
2063           val = node.primary_ip
2064         elif field == "sip":
2065           val = node.secondary_ip
2066         elif field == "tags":
2067           val = list(node.GetTags())
2068         elif field == "serial_no":
2069           val = node.serial_no
2070         elif field == "master_candidate":
2071           val = node.master_candidate
2072         elif field == "master":
2073           val = node.name == master_node
2074         elif field == "offline":
2075           val = node.offline
2076         elif field == "drained":
2077           val = node.drained
2078         elif self._FIELDS_DYNAMIC.Matches(field):
2079           val = live_data[node.name].get(field, None)
2080         else:
2081           raise errors.ParameterError(field)
2082         node_output.append(val)
2083       output.append(node_output)
2084
2085     return output
2086
2087
2088 class LUQueryNodeVolumes(NoHooksLU):
2089   """Logical unit for getting volumes on node(s).
2090
2091   """
2092   _OP_REQP = ["nodes", "output_fields"]
2093   REQ_BGL = False
2094   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2095   _FIELDS_STATIC = utils.FieldSet("node")
2096
2097   def ExpandNames(self):
2098     _CheckOutputFields(static=self._FIELDS_STATIC,
2099                        dynamic=self._FIELDS_DYNAMIC,
2100                        selected=self.op.output_fields)
2101
2102     self.needed_locks = {}
2103     self.share_locks[locking.LEVEL_NODE] = 1
2104     if not self.op.nodes:
2105       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2106     else:
2107       self.needed_locks[locking.LEVEL_NODE] = \
2108         _GetWantedNodes(self, self.op.nodes)
2109
2110   def CheckPrereq(self):
2111     """Check prerequisites.
2112
2113     This checks that the fields required are valid output fields.
2114
2115     """
2116     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2117
2118   def Exec(self, feedback_fn):
2119     """Computes the list of nodes and their attributes.
2120
2121     """
2122     nodenames = self.nodes
2123     volumes = self.rpc.call_node_volumes(nodenames)
2124
2125     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2126              in self.cfg.GetInstanceList()]
2127
2128     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2129
2130     output = []
2131     for node in nodenames:
2132       nresult = volumes[node]
2133       if nresult.offline:
2134         continue
2135       msg = nresult.RemoteFailMsg()
2136       if msg:
2137         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2138         continue
2139
2140       node_vols = nresult.payload[:]
2141       node_vols.sort(key=lambda vol: vol['dev'])
2142
2143       for vol in node_vols:
2144         node_output = []
2145         for field in self.op.output_fields:
2146           if field == "node":
2147             val = node
2148           elif field == "phys":
2149             val = vol['dev']
2150           elif field == "vg":
2151             val = vol['vg']
2152           elif field == "name":
2153             val = vol['name']
2154           elif field == "size":
2155             val = int(float(vol['size']))
2156           elif field == "instance":
2157             for inst in ilist:
2158               if node not in lv_by_node[inst]:
2159                 continue
2160               if vol['name'] in lv_by_node[inst][node]:
2161                 val = inst.name
2162                 break
2163             else:
2164               val = '-'
2165           else:
2166             raise errors.ParameterError(field)
2167           node_output.append(str(val))
2168
2169         output.append(node_output)
2170
2171     return output
2172
2173
2174 class LUAddNode(LogicalUnit):
2175   """Logical unit for adding node to the cluster.
2176
2177   """
2178   HPATH = "node-add"
2179   HTYPE = constants.HTYPE_NODE
2180   _OP_REQP = ["node_name"]
2181
2182   def BuildHooksEnv(self):
2183     """Build hooks env.
2184
2185     This will run on all nodes before, and on all nodes + the new node after.
2186
2187     """
2188     env = {
2189       "OP_TARGET": self.op.node_name,
2190       "NODE_NAME": self.op.node_name,
2191       "NODE_PIP": self.op.primary_ip,
2192       "NODE_SIP": self.op.secondary_ip,
2193       }
2194     nodes_0 = self.cfg.GetNodeList()
2195     nodes_1 = nodes_0 + [self.op.node_name, ]
2196     return env, nodes_0, nodes_1
2197
2198   def CheckPrereq(self):
2199     """Check prerequisites.
2200
2201     This checks:
2202      - the new node is not already in the config
2203      - it is resolvable
2204      - its parameters (single/dual homed) matches the cluster
2205
2206     Any errors are signalled by raising errors.OpPrereqError.
2207
2208     """
2209     node_name = self.op.node_name
2210     cfg = self.cfg
2211
2212     dns_data = utils.HostInfo(node_name)
2213
2214     node = dns_data.name
2215     primary_ip = self.op.primary_ip = dns_data.ip
2216     secondary_ip = getattr(self.op, "secondary_ip", None)
2217     if secondary_ip is None:
2218       secondary_ip = primary_ip
2219     if not utils.IsValidIP(secondary_ip):
2220       raise errors.OpPrereqError("Invalid secondary IP given")
2221     self.op.secondary_ip = secondary_ip
2222
2223     node_list = cfg.GetNodeList()
2224     if not self.op.readd and node in node_list:
2225       raise errors.OpPrereqError("Node %s is already in the configuration" %
2226                                  node)
2227     elif self.op.readd and node not in node_list:
2228       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2229
2230     for existing_node_name in node_list:
2231       existing_node = cfg.GetNodeInfo(existing_node_name)
2232
2233       if self.op.readd and node == existing_node_name:
2234         if (existing_node.primary_ip != primary_ip or
2235             existing_node.secondary_ip != secondary_ip):
2236           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2237                                      " address configuration as before")
2238         continue
2239
2240       if (existing_node.primary_ip == primary_ip or
2241           existing_node.secondary_ip == primary_ip or
2242           existing_node.primary_ip == secondary_ip or
2243           existing_node.secondary_ip == secondary_ip):
2244         raise errors.OpPrereqError("New node ip address(es) conflict with"
2245                                    " existing node %s" % existing_node.name)
2246
2247     # check that the type of the node (single versus dual homed) is the
2248     # same as for the master
2249     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2250     master_singlehomed = myself.secondary_ip == myself.primary_ip
2251     newbie_singlehomed = secondary_ip == primary_ip
2252     if master_singlehomed != newbie_singlehomed:
2253       if master_singlehomed:
2254         raise errors.OpPrereqError("The master has no private ip but the"
2255                                    " new node has one")
2256       else:
2257         raise errors.OpPrereqError("The master has a private ip but the"
2258                                    " new node doesn't have one")
2259
2260     # checks reachablity
2261     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2262       raise errors.OpPrereqError("Node not reachable by ping")
2263
2264     if not newbie_singlehomed:
2265       # check reachability from my secondary ip to newbie's secondary ip
2266       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2267                            source=myself.secondary_ip):
2268         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2269                                    " based ping to noded port")
2270
2271     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2272     mc_now, _ = self.cfg.GetMasterCandidateStats()
2273     master_candidate = mc_now < cp_size
2274
2275     self.new_node = objects.Node(name=node,
2276                                  primary_ip=primary_ip,
2277                                  secondary_ip=secondary_ip,
2278                                  master_candidate=master_candidate,
2279                                  offline=False, drained=False)
2280
2281   def Exec(self, feedback_fn):
2282     """Adds the new node to the cluster.
2283
2284     """
2285     new_node = self.new_node
2286     node = new_node.name
2287
2288     # check connectivity
2289     result = self.rpc.call_version([node])[node]
2290     result.Raise()
2291     if result.data:
2292       if constants.PROTOCOL_VERSION == result.data:
2293         logging.info("Communication to node %s fine, sw version %s match",
2294                      node, result.data)
2295       else:
2296         raise errors.OpExecError("Version mismatch master version %s,"
2297                                  " node version %s" %
2298                                  (constants.PROTOCOL_VERSION, result.data))
2299     else:
2300       raise errors.OpExecError("Cannot get version from the new node")
2301
2302     # setup ssh on node
2303     logging.info("Copy ssh key to node %s", node)
2304     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2305     keyarray = []
2306     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2307                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2308                 priv_key, pub_key]
2309
2310     for i in keyfiles:
2311       f = open(i, 'r')
2312       try:
2313         keyarray.append(f.read())
2314       finally:
2315         f.close()
2316
2317     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2318                                     keyarray[2],
2319                                     keyarray[3], keyarray[4], keyarray[5])
2320
2321     msg = result.RemoteFailMsg()
2322     if msg:
2323       raise errors.OpExecError("Cannot transfer ssh keys to the"
2324                                " new node: %s" % msg)
2325
2326     # Add node to our /etc/hosts, and add key to known_hosts
2327     if self.cfg.GetClusterInfo().modify_etc_hosts:
2328       utils.AddHostToEtcHosts(new_node.name)
2329
2330     if new_node.secondary_ip != new_node.primary_ip:
2331       result = self.rpc.call_node_has_ip_address(new_node.name,
2332                                                  new_node.secondary_ip)
2333       msg = result.RemoteFailMsg()
2334       if msg:
2335         raise errors.OpPrereqError("Failure checking secondary ip"
2336                                    " on node %s: %s" % (new_node.name, msg))
2337       if not result.payload:
2338         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2339                                  " you gave (%s). Please fix and re-run this"
2340                                  " command." % new_node.secondary_ip)
2341
2342     node_verify_list = [self.cfg.GetMasterNode()]
2343     node_verify_param = {
2344       'nodelist': [node],
2345       # TODO: do a node-net-test as well?
2346     }
2347
2348     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2349                                        self.cfg.GetClusterName())
2350     for verifier in node_verify_list:
2351       msg = result[verifier].RemoteFailMsg()
2352       if msg:
2353         raise errors.OpExecError("Cannot communicate with node %s: %s" %
2354                                  (verifier, msg))
2355       nl_payload = result[verifier].payload['nodelist']
2356       if nl_payload:
2357         for failed in nl_payload:
2358           feedback_fn("ssh/hostname verification failed %s -> %s" %
2359                       (verifier, nl_payload[failed]))
2360         raise errors.OpExecError("ssh/hostname verification failed.")
2361
2362     if self.op.readd:
2363       _RedistributeAncillaryFiles(self)
2364       self.context.ReaddNode(new_node)
2365     else:
2366       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2367       self.context.AddNode(new_node)
2368
2369
2370 class LUSetNodeParams(LogicalUnit):
2371   """Modifies the parameters of a node.
2372
2373   """
2374   HPATH = "node-modify"
2375   HTYPE = constants.HTYPE_NODE
2376   _OP_REQP = ["node_name"]
2377   REQ_BGL = False
2378
2379   def CheckArguments(self):
2380     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2381     if node_name is None:
2382       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2383     self.op.node_name = node_name
2384     _CheckBooleanOpField(self.op, 'master_candidate')
2385     _CheckBooleanOpField(self.op, 'offline')
2386     _CheckBooleanOpField(self.op, 'drained')
2387     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2388     if all_mods.count(None) == 3:
2389       raise errors.OpPrereqError("Please pass at least one modification")
2390     if all_mods.count(True) > 1:
2391       raise errors.OpPrereqError("Can't set the node into more than one"
2392                                  " state at the same time")
2393
2394   def ExpandNames(self):
2395     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2396
2397   def BuildHooksEnv(self):
2398     """Build hooks env.
2399
2400     This runs on the master node.
2401
2402     """
2403     env = {
2404       "OP_TARGET": self.op.node_name,
2405       "MASTER_CANDIDATE": str(self.op.master_candidate),
2406       "OFFLINE": str(self.op.offline),
2407       "DRAINED": str(self.op.drained),
2408       }
2409     nl = [self.cfg.GetMasterNode(),
2410           self.op.node_name]
2411     return env, nl, nl
2412
2413   def CheckPrereq(self):
2414     """Check prerequisites.
2415
2416     This only checks the instance list against the existing names.
2417
2418     """
2419     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2420
2421     if ((self.op.master_candidate == False or self.op.offline == True or
2422          self.op.drained == True) and node.master_candidate):
2423       # we will demote the node from master_candidate
2424       if self.op.node_name == self.cfg.GetMasterNode():
2425         raise errors.OpPrereqError("The master node has to be a"
2426                                    " master candidate, online and not drained")
2427       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2428       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2429       if num_candidates <= cp_size:
2430         msg = ("Not enough master candidates (desired"
2431                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2432         if self.op.force:
2433           self.LogWarning(msg)
2434         else:
2435           raise errors.OpPrereqError(msg)
2436
2437     if (self.op.master_candidate == True and
2438         ((node.offline and not self.op.offline == False) or
2439          (node.drained and not self.op.drained == False))):
2440       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2441                                  " to master_candidate" % node.name)
2442
2443     return
2444
2445   def Exec(self, feedback_fn):
2446     """Modifies a node.
2447
2448     """
2449     node = self.node
2450
2451     result = []
2452     changed_mc = False
2453
2454     if self.op.offline is not None:
2455       node.offline = self.op.offline
2456       result.append(("offline", str(self.op.offline)))
2457       if self.op.offline == True:
2458         if node.master_candidate:
2459           node.master_candidate = False
2460           changed_mc = True
2461           result.append(("master_candidate", "auto-demotion due to offline"))
2462         if node.drained:
2463           node.drained = False
2464           result.append(("drained", "clear drained status due to offline"))
2465
2466     if self.op.master_candidate is not None:
2467       node.master_candidate = self.op.master_candidate
2468       changed_mc = True
2469       result.append(("master_candidate", str(self.op.master_candidate)))
2470       if self.op.master_candidate == False:
2471         rrc = self.rpc.call_node_demote_from_mc(node.name)
2472         msg = rrc.RemoteFailMsg()
2473         if msg:
2474           self.LogWarning("Node failed to demote itself: %s" % msg)
2475
2476     if self.op.drained is not None:
2477       node.drained = self.op.drained
2478       result.append(("drained", str(self.op.drained)))
2479       if self.op.drained == True:
2480         if node.master_candidate:
2481           node.master_candidate = False
2482           changed_mc = True
2483           result.append(("master_candidate", "auto-demotion due to drain"))
2484         if node.offline:
2485           node.offline = False
2486           result.append(("offline", "clear offline status due to drain"))
2487
2488     # this will trigger configuration file update, if needed
2489     self.cfg.Update(node)
2490     # this will trigger job queue propagation or cleanup
2491     if changed_mc:
2492       self.context.ReaddNode(node)
2493
2494     return result
2495
2496
2497 class LUPowercycleNode(NoHooksLU):
2498   """Powercycles a node.
2499
2500   """
2501   _OP_REQP = ["node_name", "force"]
2502   REQ_BGL = False
2503
2504   def CheckArguments(self):
2505     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2506     if node_name is None:
2507       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2508     self.op.node_name = node_name
2509     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2510       raise errors.OpPrereqError("The node is the master and the force"
2511                                  " parameter was not set")
2512
2513   def ExpandNames(self):
2514     """Locking for PowercycleNode.
2515
2516     This is a last-resource option and shouldn't block on other
2517     jobs. Therefore, we grab no locks.
2518
2519     """
2520     self.needed_locks = {}
2521
2522   def CheckPrereq(self):
2523     """Check prerequisites.
2524
2525     This LU has no prereqs.
2526
2527     """
2528     pass
2529
2530   def Exec(self, feedback_fn):
2531     """Reboots a node.
2532
2533     """
2534     result = self.rpc.call_node_powercycle(self.op.node_name,
2535                                            self.cfg.GetHypervisorType())
2536     msg = result.RemoteFailMsg()
2537     if msg:
2538       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2539     return result.payload
2540
2541
2542 class LUQueryClusterInfo(NoHooksLU):
2543   """Query cluster configuration.
2544
2545   """
2546   _OP_REQP = []
2547   REQ_BGL = False
2548
2549   def ExpandNames(self):
2550     self.needed_locks = {}
2551
2552   def CheckPrereq(self):
2553     """No prerequsites needed for this LU.
2554
2555     """
2556     pass
2557
2558   def Exec(self, feedback_fn):
2559     """Return cluster config.
2560
2561     """
2562     cluster = self.cfg.GetClusterInfo()
2563     result = {
2564       "software_version": constants.RELEASE_VERSION,
2565       "protocol_version": constants.PROTOCOL_VERSION,
2566       "config_version": constants.CONFIG_VERSION,
2567       "os_api_version": constants.OS_API_VERSION,
2568       "export_version": constants.EXPORT_VERSION,
2569       "architecture": (platform.architecture()[0], platform.machine()),
2570       "name": cluster.cluster_name,
2571       "master": cluster.master_node,
2572       "default_hypervisor": cluster.default_hypervisor,
2573       "enabled_hypervisors": cluster.enabled_hypervisors,
2574       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2575                         for hypervisor in cluster.enabled_hypervisors]),
2576       "beparams": cluster.beparams,
2577       "nicparams": cluster.nicparams,
2578       "candidate_pool_size": cluster.candidate_pool_size,
2579       "master_netdev": cluster.master_netdev,
2580       "volume_group_name": cluster.volume_group_name,
2581       "file_storage_dir": cluster.file_storage_dir,
2582       }
2583
2584     return result
2585
2586
2587 class LUQueryConfigValues(NoHooksLU):
2588   """Return configuration values.
2589
2590   """
2591   _OP_REQP = []
2592   REQ_BGL = False
2593   _FIELDS_DYNAMIC = utils.FieldSet()
2594   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2595
2596   def ExpandNames(self):
2597     self.needed_locks = {}
2598
2599     _CheckOutputFields(static=self._FIELDS_STATIC,
2600                        dynamic=self._FIELDS_DYNAMIC,
2601                        selected=self.op.output_fields)
2602
2603   def CheckPrereq(self):
2604     """No prerequisites.
2605
2606     """
2607     pass
2608
2609   def Exec(self, feedback_fn):
2610     """Dump a representation of the cluster config to the standard output.
2611
2612     """
2613     values = []
2614     for field in self.op.output_fields:
2615       if field == "cluster_name":
2616         entry = self.cfg.GetClusterName()
2617       elif field == "master_node":
2618         entry = self.cfg.GetMasterNode()
2619       elif field == "drain_flag":
2620         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2621       else:
2622         raise errors.ParameterError(field)
2623       values.append(entry)
2624     return values
2625
2626
2627 class LUActivateInstanceDisks(NoHooksLU):
2628   """Bring up an instance's disks.
2629
2630   """
2631   _OP_REQP = ["instance_name"]
2632   REQ_BGL = False
2633
2634   def ExpandNames(self):
2635     self._ExpandAndLockInstance()
2636     self.needed_locks[locking.LEVEL_NODE] = []
2637     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2638
2639   def DeclareLocks(self, level):
2640     if level == locking.LEVEL_NODE:
2641       self._LockInstancesNodes()
2642
2643   def CheckPrereq(self):
2644     """Check prerequisites.
2645
2646     This checks that the instance is in the cluster.
2647
2648     """
2649     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2650     assert self.instance is not None, \
2651       "Cannot retrieve locked instance %s" % self.op.instance_name
2652     _CheckNodeOnline(self, self.instance.primary_node)
2653
2654   def Exec(self, feedback_fn):
2655     """Activate the disks.
2656
2657     """
2658     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2659     if not disks_ok:
2660       raise errors.OpExecError("Cannot activate block devices")
2661
2662     return disks_info
2663
2664
2665 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2666   """Prepare the block devices for an instance.
2667
2668   This sets up the block devices on all nodes.
2669
2670   @type lu: L{LogicalUnit}
2671   @param lu: the logical unit on whose behalf we execute
2672   @type instance: L{objects.Instance}
2673   @param instance: the instance for whose disks we assemble
2674   @type ignore_secondaries: boolean
2675   @param ignore_secondaries: if true, errors on secondary nodes
2676       won't result in an error return from the function
2677   @return: False if the operation failed, otherwise a list of
2678       (host, instance_visible_name, node_visible_name)
2679       with the mapping from node devices to instance devices
2680
2681   """
2682   device_info = []
2683   disks_ok = True
2684   iname = instance.name
2685   # With the two passes mechanism we try to reduce the window of
2686   # opportunity for the race condition of switching DRBD to primary
2687   # before handshaking occured, but we do not eliminate it
2688
2689   # The proper fix would be to wait (with some limits) until the
2690   # connection has been made and drbd transitions from WFConnection
2691   # into any other network-connected state (Connected, SyncTarget,
2692   # SyncSource, etc.)
2693
2694   # 1st pass, assemble on all nodes in secondary mode
2695   for inst_disk in instance.disks:
2696     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2697       lu.cfg.SetDiskID(node_disk, node)
2698       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2699       msg = result.RemoteFailMsg()
2700       if msg:
2701         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2702                            " (is_primary=False, pass=1): %s",
2703                            inst_disk.iv_name, node, msg)
2704         if not ignore_secondaries:
2705           disks_ok = False
2706
2707   # FIXME: race condition on drbd migration to primary
2708
2709   # 2nd pass, do only the primary node
2710   for inst_disk in instance.disks:
2711     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2712       if node != instance.primary_node:
2713         continue
2714       lu.cfg.SetDiskID(node_disk, node)
2715       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2716       msg = result.RemoteFailMsg()
2717       if msg:
2718         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2719                            " (is_primary=True, pass=2): %s",
2720                            inst_disk.iv_name, node, msg)
2721         disks_ok = False
2722     device_info.append((instance.primary_node, inst_disk.iv_name,
2723                         result.payload))
2724
2725   # leave the disks configured for the primary node
2726   # this is a workaround that would be fixed better by
2727   # improving the logical/physical id handling
2728   for disk in instance.disks:
2729     lu.cfg.SetDiskID(disk, instance.primary_node)
2730
2731   return disks_ok, device_info
2732
2733
2734 def _StartInstanceDisks(lu, instance, force):
2735   """Start the disks of an instance.
2736
2737   """
2738   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2739                                            ignore_secondaries=force)
2740   if not disks_ok:
2741     _ShutdownInstanceDisks(lu, instance)
2742     if force is not None and not force:
2743       lu.proc.LogWarning("", hint="If the message above refers to a"
2744                          " secondary node,"
2745                          " you can retry the operation using '--force'.")
2746     raise errors.OpExecError("Disk consistency error")
2747
2748
2749 class LUDeactivateInstanceDisks(NoHooksLU):
2750   """Shutdown an instance's disks.
2751
2752   """
2753   _OP_REQP = ["instance_name"]
2754   REQ_BGL = False
2755
2756   def ExpandNames(self):
2757     self._ExpandAndLockInstance()
2758     self.needed_locks[locking.LEVEL_NODE] = []
2759     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2760
2761   def DeclareLocks(self, level):
2762     if level == locking.LEVEL_NODE:
2763       self._LockInstancesNodes()
2764
2765   def CheckPrereq(self):
2766     """Check prerequisites.
2767
2768     This checks that the instance is in the cluster.
2769
2770     """
2771     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2772     assert self.instance is not None, \
2773       "Cannot retrieve locked instance %s" % self.op.instance_name
2774
2775   def Exec(self, feedback_fn):
2776     """Deactivate the disks
2777
2778     """
2779     instance = self.instance
2780     _SafeShutdownInstanceDisks(self, instance)
2781
2782
2783 def _SafeShutdownInstanceDisks(lu, instance):
2784   """Shutdown block devices of an instance.
2785
2786   This function checks if an instance is running, before calling
2787   _ShutdownInstanceDisks.
2788
2789   """
2790   pnode = instance.primary_node
2791   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])
2792   ins_l = ins_l[pnode]
2793   msg = ins_l.RemoteFailMsg()
2794   if msg:
2795     raise errors.OpExecError("Can't contact node %s: %s" % (pnode, msg))
2796
2797   if instance.name in ins_l.payload:
2798     raise errors.OpExecError("Instance is running, can't shutdown"
2799                              " block devices.")
2800
2801   _ShutdownInstanceDisks(lu, instance)
2802
2803
2804 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2805   """Shutdown block devices of an instance.
2806
2807   This does the shutdown on all nodes of the instance.
2808
2809   If the ignore_primary is false, errors on the primary node are
2810   ignored.
2811
2812   """
2813   all_result = True
2814   for disk in instance.disks:
2815     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2816       lu.cfg.SetDiskID(top_disk, node)
2817       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2818       msg = result.RemoteFailMsg()
2819       if msg:
2820         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2821                       disk.iv_name, node, msg)
2822         if not ignore_primary or node != instance.primary_node:
2823           all_result = False
2824   return all_result
2825
2826
2827 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2828   """Checks if a node has enough free memory.
2829
2830   This function check if a given node has the needed amount of free
2831   memory. In case the node has less memory or we cannot get the
2832   information from the node, this function raise an OpPrereqError
2833   exception.
2834
2835   @type lu: C{LogicalUnit}
2836   @param lu: a logical unit from which we get configuration data
2837   @type node: C{str}
2838   @param node: the node to check
2839   @type reason: C{str}
2840   @param reason: string to use in the error message
2841   @type requested: C{int}
2842   @param requested: the amount of memory in MiB to check for
2843   @type hypervisor_name: C{str}
2844   @param hypervisor_name: the hypervisor to ask for memory stats
2845   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2846       we cannot check the node
2847
2848   """
2849   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2850   msg = nodeinfo[node].RemoteFailMsg()
2851   if msg:
2852     raise errors.OpPrereqError("Can't get data from node %s: %s" % (node, msg))
2853   free_mem = nodeinfo[node].payload.get('memory_free', None)
2854   if not isinstance(free_mem, int):
2855     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2856                                " was '%s'" % (node, free_mem))
2857   if requested > free_mem:
2858     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2859                                " needed %s MiB, available %s MiB" %
2860                                (node, reason, requested, free_mem))
2861
2862
2863 class LUStartupInstance(LogicalUnit):
2864   """Starts an instance.
2865
2866   """
2867   HPATH = "instance-start"
2868   HTYPE = constants.HTYPE_INSTANCE
2869   _OP_REQP = ["instance_name", "force"]
2870   REQ_BGL = False
2871
2872   def ExpandNames(self):
2873     self._ExpandAndLockInstance()
2874
2875   def BuildHooksEnv(self):
2876     """Build hooks env.
2877
2878     This runs on master, primary and secondary nodes of the instance.
2879
2880     """
2881     env = {
2882       "FORCE": self.op.force,
2883       }
2884     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2885     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2886     return env, nl, nl
2887
2888   def CheckPrereq(self):
2889     """Check prerequisites.
2890
2891     This checks that the instance is in the cluster.
2892
2893     """
2894     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2895     assert self.instance is not None, \
2896       "Cannot retrieve locked instance %s" % self.op.instance_name
2897
2898     # extra beparams
2899     self.beparams = getattr(self.op, "beparams", {})
2900     if self.beparams:
2901       if not isinstance(self.beparams, dict):
2902         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2903                                    " dict" % (type(self.beparams), ))
2904       # fill the beparams dict
2905       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2906       self.op.beparams = self.beparams
2907
2908     # extra hvparams
2909     self.hvparams = getattr(self.op, "hvparams", {})
2910     if self.hvparams:
2911       if not isinstance(self.hvparams, dict):
2912         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2913                                    " dict" % (type(self.hvparams), ))
2914
2915       # check hypervisor parameter syntax (locally)
2916       cluster = self.cfg.GetClusterInfo()
2917       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2918       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2919                                     instance.hvparams)
2920       filled_hvp.update(self.hvparams)
2921       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2922       hv_type.CheckParameterSyntax(filled_hvp)
2923       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2924       self.op.hvparams = self.hvparams
2925
2926     _CheckNodeOnline(self, instance.primary_node)
2927
2928     bep = self.cfg.GetClusterInfo().FillBE(instance)
2929     # check bridges existance
2930     _CheckInstanceBridgesExist(self, instance)
2931
2932     remote_info = self.rpc.call_instance_info(instance.primary_node,
2933                                               instance.name,
2934                                               instance.hypervisor)
2935     msg = remote_info.RemoteFailMsg()
2936     if msg:
2937       raise errors.OpPrereqError("Error checking node %s: %s" %
2938                                  (instance.primary_node, msg))
2939     if not remote_info.payload: # not running already
2940       _CheckNodeFreeMemory(self, instance.primary_node,
2941                            "starting instance %s" % instance.name,
2942                            bep[constants.BE_MEMORY], instance.hypervisor)
2943
2944   def Exec(self, feedback_fn):
2945     """Start the instance.
2946
2947     """
2948     instance = self.instance
2949     force = self.op.force
2950
2951     self.cfg.MarkInstanceUp(instance.name)
2952
2953     node_current = instance.primary_node
2954
2955     _StartInstanceDisks(self, instance, force)
2956
2957     result = self.rpc.call_instance_start(node_current, instance,
2958                                           self.hvparams, self.beparams)
2959     msg = result.RemoteFailMsg()
2960     if msg:
2961       _ShutdownInstanceDisks(self, instance)
2962       raise errors.OpExecError("Could not start instance: %s" % msg)
2963
2964
2965 class LURebootInstance(LogicalUnit):
2966   """Reboot an instance.
2967
2968   """
2969   HPATH = "instance-reboot"
2970   HTYPE = constants.HTYPE_INSTANCE
2971   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2972   REQ_BGL = False
2973
2974   def ExpandNames(self):
2975     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2976                                    constants.INSTANCE_REBOOT_HARD,
2977                                    constants.INSTANCE_REBOOT_FULL]:
2978       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2979                                   (constants.INSTANCE_REBOOT_SOFT,
2980                                    constants.INSTANCE_REBOOT_HARD,
2981                                    constants.INSTANCE_REBOOT_FULL))
2982     self._ExpandAndLockInstance()
2983
2984   def BuildHooksEnv(self):
2985     """Build hooks env.
2986
2987     This runs on master, primary and secondary nodes of the instance.
2988
2989     """
2990     env = {
2991       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2992       "REBOOT_TYPE": self.op.reboot_type,
2993       }
2994     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2995     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2996     return env, nl, nl
2997
2998   def CheckPrereq(self):
2999     """Check prerequisites.
3000
3001     This checks that the instance is in the cluster.
3002
3003     """
3004     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3005     assert self.instance is not None, \
3006       "Cannot retrieve locked instance %s" % self.op.instance_name
3007
3008     _CheckNodeOnline(self, instance.primary_node)
3009
3010     # check bridges existance
3011     _CheckInstanceBridgesExist(self, instance)
3012
3013   def Exec(self, feedback_fn):
3014     """Reboot the instance.
3015
3016     """
3017     instance = self.instance
3018     ignore_secondaries = self.op.ignore_secondaries
3019     reboot_type = self.op.reboot_type
3020
3021     node_current = instance.primary_node
3022
3023     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3024                        constants.INSTANCE_REBOOT_HARD]:
3025       for disk in instance.disks:
3026         self.cfg.SetDiskID(disk, node_current)
3027       result = self.rpc.call_instance_reboot(node_current, instance,
3028                                              reboot_type)
3029       msg = result.RemoteFailMsg()
3030       if msg:
3031         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3032     else:
3033       result = self.rpc.call_instance_shutdown(node_current, instance)
3034       msg = result.RemoteFailMsg()
3035       if msg:
3036         raise errors.OpExecError("Could not shutdown instance for"
3037                                  " full reboot: %s" % msg)
3038       _ShutdownInstanceDisks(self, instance)
3039       _StartInstanceDisks(self, instance, ignore_secondaries)
3040       result = self.rpc.call_instance_start(node_current, instance, None, None)
3041       msg = result.RemoteFailMsg()
3042       if msg:
3043         _ShutdownInstanceDisks(self, instance)
3044         raise errors.OpExecError("Could not start instance for"
3045                                  " full reboot: %s" % msg)
3046
3047     self.cfg.MarkInstanceUp(instance.name)
3048
3049
3050 class LUShutdownInstance(LogicalUnit):
3051   """Shutdown an instance.
3052
3053   """
3054   HPATH = "instance-stop"
3055   HTYPE = constants.HTYPE_INSTANCE
3056   _OP_REQP = ["instance_name"]
3057   REQ_BGL = False
3058
3059   def ExpandNames(self):
3060     self._ExpandAndLockInstance()
3061
3062   def BuildHooksEnv(self):
3063     """Build hooks env.
3064
3065     This runs on master, primary and secondary nodes of the instance.
3066
3067     """
3068     env = _BuildInstanceHookEnvByObject(self, self.instance)
3069     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3070     return env, nl, nl
3071
3072   def CheckPrereq(self):
3073     """Check prerequisites.
3074
3075     This checks that the instance is in the cluster.
3076
3077     """
3078     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3079     assert self.instance is not None, \
3080       "Cannot retrieve locked instance %s" % self.op.instance_name
3081     _CheckNodeOnline(self, self.instance.primary_node)
3082
3083   def Exec(self, feedback_fn):
3084     """Shutdown the instance.
3085
3086     """
3087     instance = self.instance
3088     node_current = instance.primary_node
3089     self.cfg.MarkInstanceDown(instance.name)
3090     result = self.rpc.call_instance_shutdown(node_current, instance)
3091     msg = result.RemoteFailMsg()
3092     if msg:
3093       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3094
3095     _ShutdownInstanceDisks(self, instance)
3096
3097
3098 class LUReinstallInstance(LogicalUnit):
3099   """Reinstall an instance.
3100
3101   """
3102   HPATH = "instance-reinstall"
3103   HTYPE = constants.HTYPE_INSTANCE
3104   _OP_REQP = ["instance_name"]
3105   REQ_BGL = False
3106
3107   def ExpandNames(self):
3108     self._ExpandAndLockInstance()
3109
3110   def BuildHooksEnv(self):
3111     """Build hooks env.
3112
3113     This runs on master, primary and secondary nodes of the instance.
3114
3115     """
3116     env = _BuildInstanceHookEnvByObject(self, self.instance)
3117     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3118     return env, nl, nl
3119
3120   def CheckPrereq(self):
3121     """Check prerequisites.
3122
3123     This checks that the instance is in the cluster and is not running.
3124
3125     """
3126     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3127     assert instance is not None, \
3128       "Cannot retrieve locked instance %s" % self.op.instance_name
3129     _CheckNodeOnline(self, instance.primary_node)
3130
3131     if instance.disk_template == constants.DT_DISKLESS:
3132       raise errors.OpPrereqError("Instance '%s' has no disks" %
3133                                  self.op.instance_name)
3134     if instance.admin_up:
3135       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3136                                  self.op.instance_name)
3137     remote_info = self.rpc.call_instance_info(instance.primary_node,
3138                                               instance.name,
3139                                               instance.hypervisor)
3140     msg = remote_info.RemoteFailMsg()
3141     if msg:
3142       raise errors.OpPrereqError("Error checking node %s: %s" %
3143                                  (instance.primary_node, msg))
3144     if remote_info.payload:
3145       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3146                                  (self.op.instance_name,
3147                                   instance.primary_node))
3148
3149     self.op.os_type = getattr(self.op, "os_type", None)
3150     if self.op.os_type is not None:
3151       # OS verification
3152       pnode = self.cfg.GetNodeInfo(
3153         self.cfg.ExpandNodeName(instance.primary_node))
3154       if pnode is None:
3155         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3156                                    self.op.pnode)
3157       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3158       result.Raise()
3159       if not isinstance(result.data, objects.OS):
3160         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3161                                    " primary node"  % self.op.os_type)
3162
3163     self.instance = instance
3164
3165   def Exec(self, feedback_fn):
3166     """Reinstall the instance.
3167
3168     """
3169     inst = self.instance
3170
3171     if self.op.os_type is not None:
3172       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3173       inst.os = self.op.os_type
3174       self.cfg.Update(inst)
3175
3176     _StartInstanceDisks(self, inst, None)
3177     try:
3178       feedback_fn("Running the instance OS create scripts...")
3179       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3180       msg = result.RemoteFailMsg()
3181       if msg:
3182         raise errors.OpExecError("Could not install OS for instance %s"
3183                                  " on node %s: %s" %
3184                                  (inst.name, inst.primary_node, msg))
3185     finally:
3186       _ShutdownInstanceDisks(self, inst)
3187
3188
3189 class LURenameInstance(LogicalUnit):
3190   """Rename an instance.
3191
3192   """
3193   HPATH = "instance-rename"
3194   HTYPE = constants.HTYPE_INSTANCE
3195   _OP_REQP = ["instance_name", "new_name"]
3196
3197   def BuildHooksEnv(self):
3198     """Build hooks env.
3199
3200     This runs on master, primary and secondary nodes of the instance.
3201
3202     """
3203     env = _BuildInstanceHookEnvByObject(self, self.instance)
3204     env["INSTANCE_NEW_NAME"] = self.op.new_name
3205     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3206     return env, nl, nl
3207
3208   def CheckPrereq(self):
3209     """Check prerequisites.
3210
3211     This checks that the instance is in the cluster and is not running.
3212
3213     """
3214     instance = self.cfg.GetInstanceInfo(
3215       self.cfg.ExpandInstanceName(self.op.instance_name))
3216     if instance is None:
3217       raise errors.OpPrereqError("Instance '%s' not known" %
3218                                  self.op.instance_name)
3219     _CheckNodeOnline(self, instance.primary_node)
3220
3221     if instance.admin_up:
3222       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3223                                  self.op.instance_name)
3224     remote_info = self.rpc.call_instance_info(instance.primary_node,
3225                                               instance.name,
3226                                               instance.hypervisor)
3227     msg = remote_info.RemoteFailMsg()
3228     if msg:
3229       raise errors.OpPrereqError("Error checking node %s: %s" %
3230                                  (instance.primary_node, msg))
3231     if remote_info.payload:
3232       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3233                                  (self.op.instance_name,
3234                                   instance.primary_node))
3235     self.instance = instance
3236
3237     # new name verification
3238     name_info = utils.HostInfo(self.op.new_name)
3239
3240     self.op.new_name = new_name = name_info.name
3241     instance_list = self.cfg.GetInstanceList()
3242     if new_name in instance_list:
3243       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3244                                  new_name)
3245
3246     if not getattr(self.op, "ignore_ip", False):
3247       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3248         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3249                                    (name_info.ip, new_name))
3250
3251
3252   def Exec(self, feedback_fn):
3253     """Reinstall the instance.
3254
3255     """
3256     inst = self.instance
3257     old_name = inst.name
3258
3259     if inst.disk_template == constants.DT_FILE:
3260       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3261
3262     self.cfg.RenameInstance(inst.name, self.op.new_name)
3263     # Change the instance lock. This is definitely safe while we hold the BGL
3264     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3265     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3266
3267     # re-read the instance from the configuration after rename
3268     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3269
3270     if inst.disk_template == constants.DT_FILE:
3271       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3272       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3273                                                      old_file_storage_dir,
3274                                                      new_file_storage_dir)
3275       result.Raise()
3276       if not result.data:
3277         raise errors.OpExecError("Could not connect to node '%s' to rename"
3278                                  " directory '%s' to '%s' (but the instance"
3279                                  " has been renamed in Ganeti)" % (
3280                                  inst.primary_node, old_file_storage_dir,
3281                                  new_file_storage_dir))
3282
3283       if not result.data[0]:
3284         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3285                                  " (but the instance has been renamed in"
3286                                  " Ganeti)" % (old_file_storage_dir,
3287                                                new_file_storage_dir))
3288
3289     _StartInstanceDisks(self, inst, None)
3290     try:
3291       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3292                                                  old_name)
3293       msg = result.RemoteFailMsg()
3294       if msg:
3295         msg = ("Could not run OS rename script for instance %s on node %s"
3296                " (but the instance has been renamed in Ganeti): %s" %
3297                (inst.name, inst.primary_node, msg))
3298         self.proc.LogWarning(msg)
3299     finally:
3300       _ShutdownInstanceDisks(self, inst)
3301
3302
3303 class LURemoveInstance(LogicalUnit):
3304   """Remove an instance.
3305
3306   """
3307   HPATH = "instance-remove"
3308   HTYPE = constants.HTYPE_INSTANCE
3309   _OP_REQP = ["instance_name", "ignore_failures"]
3310   REQ_BGL = False
3311
3312   def ExpandNames(self):
3313     self._ExpandAndLockInstance()
3314     self.needed_locks[locking.LEVEL_NODE] = []
3315     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3316
3317   def DeclareLocks(self, level):
3318     if level == locking.LEVEL_NODE:
3319       self._LockInstancesNodes()
3320
3321   def BuildHooksEnv(self):
3322     """Build hooks env.
3323
3324     This runs on master, primary and secondary nodes of the instance.
3325
3326     """
3327     env = _BuildInstanceHookEnvByObject(self, self.instance)
3328     nl = [self.cfg.GetMasterNode()]
3329     return env, nl, nl
3330
3331   def CheckPrereq(self):
3332     """Check prerequisites.
3333
3334     This checks that the instance is in the cluster.
3335
3336     """
3337     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3338     assert self.instance is not None, \
3339       "Cannot retrieve locked instance %s" % self.op.instance_name
3340
3341   def Exec(self, feedback_fn):
3342     """Remove the instance.
3343
3344     """
3345     instance = self.instance
3346     logging.info("Shutting down instance %s on node %s",
3347                  instance.name, instance.primary_node)
3348
3349     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3350     msg = result.RemoteFailMsg()
3351     if msg:
3352       if self.op.ignore_failures:
3353         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3354       else:
3355         raise errors.OpExecError("Could not shutdown instance %s on"
3356                                  " node %s: %s" %
3357                                  (instance.name, instance.primary_node, msg))
3358
3359     logging.info("Removing block devices for instance %s", instance.name)
3360
3361     if not _RemoveDisks(self, instance):
3362       if self.op.ignore_failures:
3363         feedback_fn("Warning: can't remove instance's disks")
3364       else:
3365         raise errors.OpExecError("Can't remove instance's disks")
3366
3367     logging.info("Removing instance %s out of cluster config", instance.name)
3368
3369     self.cfg.RemoveInstance(instance.name)
3370     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3371
3372
3373 class LUQueryInstances(NoHooksLU):
3374   """Logical unit for querying instances.
3375
3376   """
3377   _OP_REQP = ["output_fields", "names", "use_locking"]
3378   REQ_BGL = False
3379   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3380                                     "admin_state",
3381                                     "disk_template", "ip", "mac", "bridge",
3382                                     "sda_size", "sdb_size", "vcpus", "tags",
3383                                     "network_port", "beparams",
3384                                     r"(disk)\.(size)/([0-9]+)",
3385                                     r"(disk)\.(sizes)", "disk_usage",
3386                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3387                                     r"(nic)\.(macs|ips|bridges)",
3388                                     r"(disk|nic)\.(count)",
3389                                     "serial_no", "hypervisor", "hvparams",] +
3390                                   ["hv/%s" % name
3391                                    for name in constants.HVS_PARAMETERS] +
3392                                   ["be/%s" % name
3393                                    for name in constants.BES_PARAMETERS])
3394   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3395
3396
3397   def ExpandNames(self):
3398     _CheckOutputFields(static=self._FIELDS_STATIC,
3399                        dynamic=self._FIELDS_DYNAMIC,
3400                        selected=self.op.output_fields)
3401
3402     self.needed_locks = {}
3403     self.share_locks[locking.LEVEL_INSTANCE] = 1
3404     self.share_locks[locking.LEVEL_NODE] = 1
3405
3406     if self.op.names:
3407       self.wanted = _GetWantedInstances(self, self.op.names)
3408     else:
3409       self.wanted = locking.ALL_SET
3410
3411     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3412     self.do_locking = self.do_node_query and self.op.use_locking
3413     if self.do_locking:
3414       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3415       self.needed_locks[locking.LEVEL_NODE] = []
3416       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3417
3418   def DeclareLocks(self, level):
3419     if level == locking.LEVEL_NODE and self.do_locking:
3420       self._LockInstancesNodes()
3421
3422   def CheckPrereq(self):
3423     """Check prerequisites.
3424
3425     """
3426     pass
3427
3428   def Exec(self, feedback_fn):
3429     """Computes the list of nodes and their attributes.
3430
3431     """
3432     all_info = self.cfg.GetAllInstancesInfo()
3433     if self.wanted == locking.ALL_SET:
3434       # caller didn't specify instance names, so ordering is not important
3435       if self.do_locking:
3436         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3437       else:
3438         instance_names = all_info.keys()
3439       instance_names = utils.NiceSort(instance_names)
3440     else:
3441       # caller did specify names, so we must keep the ordering
3442       if self.do_locking:
3443         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3444       else:
3445         tgt_set = all_info.keys()
3446       missing = set(self.wanted).difference(tgt_set)
3447       if missing:
3448         raise errors.OpExecError("Some instances were removed before"
3449                                  " retrieving their data: %s" % missing)
3450       instance_names = self.wanted
3451
3452     instance_list = [all_info[iname] for iname in instance_names]
3453
3454     # begin data gathering
3455
3456     nodes = frozenset([inst.primary_node for inst in instance_list])
3457     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3458
3459     bad_nodes = []
3460     off_nodes = []
3461     if self.do_node_query:
3462       live_data = {}
3463       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3464       for name in nodes:
3465         result = node_data[name]
3466         if result.offline:
3467           # offline nodes will be in both lists
3468           off_nodes.append(name)
3469         if result.failed or result.RemoteFailMsg():
3470           bad_nodes.append(name)
3471         else:
3472           if result.payload:
3473             live_data.update(result.payload)
3474           # else no instance is alive
3475     else:
3476       live_data = dict([(name, {}) for name in instance_names])
3477
3478     # end data gathering
3479
3480     HVPREFIX = "hv/"
3481     BEPREFIX = "be/"
3482     output = []
3483     for instance in instance_list:
3484       iout = []
3485       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3486       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3487       for field in self.op.output_fields:
3488         st_match = self._FIELDS_STATIC.Matches(field)
3489         if field == "name":
3490           val = instance.name
3491         elif field == "os":
3492           val = instance.os
3493         elif field == "pnode":
3494           val = instance.primary_node
3495         elif field == "snodes":
3496           val = list(instance.secondary_nodes)
3497         elif field == "admin_state":
3498           val = instance.admin_up
3499         elif field == "oper_state":
3500           if instance.primary_node in bad_nodes:
3501             val = None
3502           else:
3503             val = bool(live_data.get(instance.name))
3504         elif field == "status":
3505           if instance.primary_node in off_nodes:
3506             val = "ERROR_nodeoffline"
3507           elif instance.primary_node in bad_nodes:
3508             val = "ERROR_nodedown"
3509           else:
3510             running = bool(live_data.get(instance.name))
3511             if running:
3512               if instance.admin_up:
3513                 val = "running"
3514               else:
3515                 val = "ERROR_up"
3516             else:
3517               if instance.admin_up:
3518                 val = "ERROR_down"
3519               else:
3520                 val = "ADMIN_down"
3521         elif field == "oper_ram":
3522           if instance.primary_node in bad_nodes:
3523             val = None
3524           elif instance.name in live_data:
3525             val = live_data[instance.name].get("memory", "?")
3526           else:
3527             val = "-"
3528         elif field == "disk_template":
3529           val = instance.disk_template
3530         elif field == "ip":
3531           val = instance.nics[0].ip
3532         elif field == "bridge":
3533           val = instance.nics[0].bridge
3534         elif field == "mac":
3535           val = instance.nics[0].mac
3536         elif field == "sda_size" or field == "sdb_size":
3537           idx = ord(field[2]) - ord('a')
3538           try:
3539             val = instance.FindDisk(idx).size
3540           except errors.OpPrereqError:
3541             val = None
3542         elif field == "disk_usage": # total disk usage per node
3543           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3544           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3545         elif field == "tags":
3546           val = list(instance.GetTags())
3547         elif field == "serial_no":
3548           val = instance.serial_no
3549         elif field == "network_port":
3550           val = instance.network_port
3551         elif field == "hypervisor":
3552           val = instance.hypervisor
3553         elif field == "hvparams":
3554           val = i_hv
3555         elif (field.startswith(HVPREFIX) and
3556               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3557           val = i_hv.get(field[len(HVPREFIX):], None)
3558         elif field == "beparams":
3559           val = i_be
3560         elif (field.startswith(BEPREFIX) and
3561               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3562           val = i_be.get(field[len(BEPREFIX):], None)
3563         elif st_match and st_match.groups():
3564           # matches a variable list
3565           st_groups = st_match.groups()
3566           if st_groups and st_groups[0] == "disk":
3567             if st_groups[1] == "count":
3568               val = len(instance.disks)
3569             elif st_groups[1] == "sizes":
3570               val = [disk.size for disk in instance.disks]
3571             elif st_groups[1] == "size":
3572               try:
3573                 val = instance.FindDisk(st_groups[2]).size
3574               except errors.OpPrereqError:
3575                 val = None
3576             else:
3577               assert False, "Unhandled disk parameter"
3578           elif st_groups[0] == "nic":
3579             if st_groups[1] == "count":
3580               val = len(instance.nics)
3581             elif st_groups[1] == "macs":
3582               val = [nic.mac for nic in instance.nics]
3583             elif st_groups[1] == "ips":
3584               val = [nic.ip for nic in instance.nics]
3585             elif st_groups[1] == "bridges":
3586               val = [nic.bridge for nic in instance.nics]
3587             else:
3588               # index-based item
3589               nic_idx = int(st_groups[2])
3590               if nic_idx >= len(instance.nics):
3591                 val = None
3592               else:
3593                 if st_groups[1] == "mac":
3594                   val = instance.nics[nic_idx].mac
3595                 elif st_groups[1] == "ip":
3596                   val = instance.nics[nic_idx].ip
3597                 elif st_groups[1] == "bridge":
3598                   val = instance.nics[nic_idx].bridge
3599                 else:
3600                   assert False, "Unhandled NIC parameter"
3601           else:
3602             assert False, "Unhandled variable parameter"
3603         else:
3604           raise errors.ParameterError(field)
3605         iout.append(val)
3606       output.append(iout)
3607
3608     return output
3609
3610
3611 class LUFailoverInstance(LogicalUnit):
3612   """Failover an instance.
3613
3614   """
3615   HPATH = "instance-failover"
3616   HTYPE = constants.HTYPE_INSTANCE
3617   _OP_REQP = ["instance_name", "ignore_consistency"]
3618   REQ_BGL = False
3619
3620   def ExpandNames(self):
3621     self._ExpandAndLockInstance()
3622     self.needed_locks[locking.LEVEL_NODE] = []
3623     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3624
3625   def DeclareLocks(self, level):
3626     if level == locking.LEVEL_NODE:
3627       self._LockInstancesNodes()
3628
3629   def BuildHooksEnv(self):
3630     """Build hooks env.
3631
3632     This runs on master, primary and secondary nodes of the instance.
3633
3634     """
3635     env = {
3636       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3637       }
3638     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3639     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3640     return env, nl, nl
3641
3642   def CheckPrereq(self):
3643     """Check prerequisites.
3644
3645     This checks that the instance is in the cluster.
3646
3647     """
3648     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3649     assert self.instance is not None, \
3650       "Cannot retrieve locked instance %s" % self.op.instance_name
3651
3652     bep = self.cfg.GetClusterInfo().FillBE(instance)
3653     if instance.disk_template not in constants.DTS_NET_MIRROR:
3654       raise errors.OpPrereqError("Instance's disk layout is not"
3655                                  " network mirrored, cannot failover.")
3656
3657     secondary_nodes = instance.secondary_nodes
3658     if not secondary_nodes:
3659       raise errors.ProgrammerError("no secondary node but using "
3660                                    "a mirrored disk template")
3661
3662     target_node = secondary_nodes[0]
3663     _CheckNodeOnline(self, target_node)
3664     _CheckNodeNotDrained(self, target_node)
3665     # check memory requirements on the secondary node
3666     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3667                          instance.name, bep[constants.BE_MEMORY],
3668                          instance.hypervisor)
3669     # check bridge existance
3670     _CheckInstanceBridgesExist(self, instance, node=target_node)
3671
3672   def Exec(self, feedback_fn):
3673     """Failover an instance.
3674
3675     The failover is done by shutting it down on its present node and
3676     starting it on the secondary.
3677
3678     """
3679     instance = self.instance
3680
3681     source_node = instance.primary_node
3682     target_node = instance.secondary_nodes[0]
3683
3684     feedback_fn("* checking disk consistency between source and target")
3685     for dev in instance.disks:
3686       # for drbd, these are drbd over lvm
3687       if not _CheckDiskConsistency(self, dev, target_node, False):
3688         if instance.admin_up and not self.op.ignore_consistency:
3689           raise errors.OpExecError("Disk %s is degraded on target node,"
3690                                    " aborting failover." % dev.iv_name)
3691
3692     feedback_fn("* shutting down instance on source node")
3693     logging.info("Shutting down instance %s on node %s",
3694                  instance.name, source_node)
3695
3696     result = self.rpc.call_instance_shutdown(source_node, instance)
3697     msg = result.RemoteFailMsg()
3698     if msg:
3699       if self.op.ignore_consistency:
3700         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3701                              " Proceeding anyway. Please make sure node"
3702                              " %s is down. Error details: %s",
3703                              instance.name, source_node, source_node, msg)
3704       else:
3705         raise errors.OpExecError("Could not shutdown instance %s on"
3706                                  " node %s: %s" %
3707                                  (instance.name, source_node, msg))
3708
3709     feedback_fn("* deactivating the instance's disks on source node")
3710     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3711       raise errors.OpExecError("Can't shut down the instance's disks.")
3712
3713     instance.primary_node = target_node
3714     # distribute new instance config to the other nodes
3715     self.cfg.Update(instance)
3716
3717     # Only start the instance if it's marked as up
3718     if instance.admin_up:
3719       feedback_fn("* activating the instance's disks on target node")
3720       logging.info("Starting instance %s on node %s",
3721                    instance.name, target_node)
3722
3723       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3724                                                ignore_secondaries=True)
3725       if not disks_ok:
3726         _ShutdownInstanceDisks(self, instance)
3727         raise errors.OpExecError("Can't activate the instance's disks")
3728
3729       feedback_fn("* starting the instance on the target node")
3730       result = self.rpc.call_instance_start(target_node, instance, None, None)
3731       msg = result.RemoteFailMsg()
3732       if msg:
3733         _ShutdownInstanceDisks(self, instance)
3734         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3735                                  (instance.name, target_node, msg))
3736
3737
3738 class LUMigrateInstance(LogicalUnit):
3739   """Migrate an instance.
3740
3741   This is migration without shutting down, compared to the failover,
3742   which is done with shutdown.
3743
3744   """
3745   HPATH = "instance-migrate"
3746   HTYPE = constants.HTYPE_INSTANCE
3747   _OP_REQP = ["instance_name", "live", "cleanup"]
3748
3749   REQ_BGL = False
3750
3751   def ExpandNames(self):
3752     self._ExpandAndLockInstance()
3753     self.needed_locks[locking.LEVEL_NODE] = []
3754     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3755
3756   def DeclareLocks(self, level):
3757     if level == locking.LEVEL_NODE:
3758       self._LockInstancesNodes()
3759
3760   def BuildHooksEnv(self):
3761     """Build hooks env.
3762
3763     This runs on master, primary and secondary nodes of the instance.
3764
3765     """
3766     env = _BuildInstanceHookEnvByObject(self, self.instance)
3767     env["MIGRATE_LIVE"] = self.op.live
3768     env["MIGRATE_CLEANUP"] = self.op.cleanup
3769     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3770     return env, nl, nl
3771
3772   def CheckPrereq(self):
3773     """Check prerequisites.
3774
3775     This checks that the instance is in the cluster.
3776
3777     """
3778     instance = self.cfg.GetInstanceInfo(
3779       self.cfg.ExpandInstanceName(self.op.instance_name))
3780     if instance is None:
3781       raise errors.OpPrereqError("Instance '%s' not known" %
3782                                  self.op.instance_name)
3783
3784     if instance.disk_template != constants.DT_DRBD8:
3785       raise errors.OpPrereqError("Instance's disk layout is not"
3786                                  " drbd8, cannot migrate.")
3787
3788     secondary_nodes = instance.secondary_nodes
3789     if not secondary_nodes:
3790       raise errors.ConfigurationError("No secondary node but using"
3791                                       " drbd8 disk template")
3792
3793     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3794
3795     target_node = secondary_nodes[0]
3796     # check memory requirements on the secondary node
3797     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3798                          instance.name, i_be[constants.BE_MEMORY],
3799                          instance.hypervisor)
3800
3801     # check bridge existance
3802     _CheckInstanceBridgesExist(self, instance, node=target_node)
3803
3804     if not self.op.cleanup:
3805       _CheckNodeNotDrained(self, target_node)
3806       result = self.rpc.call_instance_migratable(instance.primary_node,
3807                                                  instance)
3808       msg = result.RemoteFailMsg()
3809       if msg:
3810         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3811                                    msg)
3812
3813     self.instance = instance
3814
3815   def _WaitUntilSync(self):
3816     """Poll with custom rpc for disk sync.
3817
3818     This uses our own step-based rpc call.
3819
3820     """
3821     self.feedback_fn("* wait until resync is done")
3822     all_done = False
3823     while not all_done:
3824       all_done = True
3825       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3826                                             self.nodes_ip,
3827                                             self.instance.disks)
3828       min_percent = 100
3829       for node, nres in result.items():
3830         msg = nres.RemoteFailMsg()
3831         if msg:
3832           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3833                                    (node, msg))
3834         node_done, node_percent = nres.payload
3835         all_done = all_done and node_done
3836         if node_percent is not None:
3837           min_percent = min(min_percent, node_percent)
3838       if not all_done:
3839         if min_percent < 100:
3840           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3841         time.sleep(2)
3842
3843   def _EnsureSecondary(self, node):
3844     """Demote a node to secondary.
3845
3846     """
3847     self.feedback_fn("* switching node %s to secondary mode" % node)
3848
3849     for dev in self.instance.disks:
3850       self.cfg.SetDiskID(dev, node)
3851
3852     result = self.rpc.call_blockdev_close(node, self.instance.name,
3853                                           self.instance.disks)
3854     msg = result.RemoteFailMsg()
3855     if msg:
3856       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3857                                " error %s" % (node, msg))
3858
3859   def _GoStandalone(self):
3860     """Disconnect from the network.
3861
3862     """
3863     self.feedback_fn("* changing into standalone mode")
3864     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3865                                                self.instance.disks)
3866     for node, nres in result.items():
3867       msg = nres.RemoteFailMsg()
3868       if msg:
3869         raise errors.OpExecError("Cannot disconnect disks node %s,"
3870                                  " error %s" % (node, msg))
3871
3872   def _GoReconnect(self, multimaster):
3873     """Reconnect to the network.
3874
3875     """
3876     if multimaster:
3877       msg = "dual-master"
3878     else:
3879       msg = "single-master"
3880     self.feedback_fn("* changing disks into %s mode" % msg)
3881     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3882                                            self.instance.disks,
3883                                            self.instance.name, multimaster)
3884     for node, nres in result.items():
3885       msg = nres.RemoteFailMsg()
3886       if msg:
3887         raise errors.OpExecError("Cannot change disks config on node %s,"
3888                                  " error: %s" % (node, msg))
3889
3890   def _ExecCleanup(self):
3891     """Try to cleanup after a failed migration.
3892
3893     The cleanup is done by:
3894       - check that the instance is running only on one node
3895         (and update the config if needed)
3896       - change disks on its secondary node to secondary
3897       - wait until disks are fully synchronized
3898       - disconnect from the network
3899       - change disks into single-master mode
3900       - wait again until disks are fully synchronized
3901
3902     """
3903     instance = self.instance
3904     target_node = self.target_node
3905     source_node = self.source_node
3906
3907     # check running on only one node
3908     self.feedback_fn("* checking where the instance actually runs"
3909                      " (if this hangs, the hypervisor might be in"
3910                      " a bad state)")
3911     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3912     for node, result in ins_l.items():
3913       msg = result.RemoteFailMsg()
3914       if msg:
3915         raise errors.OpExecError("Can't contact node %s: %s" % (node, msg))
3916
3917     runningon_source = instance.name in ins_l[source_node].payload
3918     runningon_target = instance.name in ins_l[target_node].payload
3919
3920     if runningon_source and runningon_target:
3921       raise errors.OpExecError("Instance seems to be running on two nodes,"
3922                                " or the hypervisor is confused. You will have"
3923                                " to ensure manually that it runs only on one"
3924                                " and restart this operation.")
3925
3926     if not (runningon_source or runningon_target):
3927       raise errors.OpExecError("Instance does not seem to be running at all."
3928                                " In this case, it's safer to repair by"
3929                                " running 'gnt-instance stop' to ensure disk"
3930                                " shutdown, and then restarting it.")
3931
3932     if runningon_target:
3933       # the migration has actually succeeded, we need to update the config
3934       self.feedback_fn("* instance running on secondary node (%s),"
3935                        " updating config" % target_node)
3936       instance.primary_node = target_node
3937       self.cfg.Update(instance)
3938       demoted_node = source_node
3939     else:
3940       self.feedback_fn("* instance confirmed to be running on its"
3941                        " primary node (%s)" % source_node)
3942       demoted_node = target_node
3943
3944     self._EnsureSecondary(demoted_node)
3945     try:
3946       self._WaitUntilSync()
3947     except errors.OpExecError:
3948       # we ignore here errors, since if the device is standalone, it
3949       # won't be able to sync
3950       pass
3951     self._GoStandalone()
3952     self._GoReconnect(False)
3953     self._WaitUntilSync()
3954
3955     self.feedback_fn("* done")
3956
3957   def _RevertDiskStatus(self):
3958     """Try to revert the disk status after a failed migration.
3959
3960     """
3961     target_node = self.target_node
3962     try:
3963       self._EnsureSecondary(target_node)
3964       self._GoStandalone()
3965       self._GoReconnect(False)
3966       self._WaitUntilSync()
3967     except errors.OpExecError, err:
3968       self.LogWarning("Migration failed and I can't reconnect the"
3969                       " drives: error '%s'\n"
3970                       "Please look and recover the instance status" %
3971                       str(err))
3972
3973   def _AbortMigration(self):
3974     """Call the hypervisor code to abort a started migration.
3975
3976     """
3977     instance = self.instance
3978     target_node = self.target_node
3979     migration_info = self.migration_info
3980
3981     abort_result = self.rpc.call_finalize_migration(target_node,
3982                                                     instance,
3983                                                     migration_info,
3984                                                     False)
3985     abort_msg = abort_result.RemoteFailMsg()
3986     if abort_msg:
3987       logging.error("Aborting migration failed on target node %s: %s" %
3988                     (target_node, abort_msg))
3989       # Don't raise an exception here, as we stil have to try to revert the
3990       # disk status, even if this step failed.
3991
3992   def _ExecMigration(self):
3993     """Migrate an instance.
3994
3995     The migrate is done by:
3996       - change the disks into dual-master mode
3997       - wait until disks are fully synchronized again
3998       - migrate the instance
3999       - change disks on the new secondary node (the old primary) to secondary
4000       - wait until disks are fully synchronized
4001       - change disks into single-master mode
4002
4003     """
4004     instance = self.instance
4005     target_node = self.target_node
4006     source_node = self.source_node
4007
4008     self.feedback_fn("* checking disk consistency between source and target")
4009     for dev in instance.disks:
4010       if not _CheckDiskConsistency(self, dev, target_node, False):
4011         raise errors.OpExecError("Disk %s is degraded or not fully"
4012                                  " synchronized on target node,"
4013                                  " aborting migrate." % dev.iv_name)
4014
4015     # First get the migration information from the remote node
4016     result = self.rpc.call_migration_info(source_node, instance)
4017     msg = result.RemoteFailMsg()
4018     if msg:
4019       log_err = ("Failed fetching source migration information from %s: %s" %
4020                  (source_node, msg))
4021       logging.error(log_err)
4022       raise errors.OpExecError(log_err)
4023
4024     self.migration_info = migration_info = result.payload
4025
4026     # Then switch the disks to master/master mode
4027     self._EnsureSecondary(target_node)
4028     self._GoStandalone()
4029     self._GoReconnect(True)
4030     self._WaitUntilSync()
4031
4032     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4033     result = self.rpc.call_accept_instance(target_node,
4034                                            instance,
4035                                            migration_info,
4036                                            self.nodes_ip[target_node])
4037
4038     msg = result.RemoteFailMsg()
4039     if msg:
4040       logging.error("Instance pre-migration failed, trying to revert"
4041                     " disk status: %s", msg)
4042       self._AbortMigration()
4043       self._RevertDiskStatus()
4044       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4045                                (instance.name, msg))
4046
4047     self.feedback_fn("* migrating instance to %s" % target_node)
4048     time.sleep(10)
4049     result = self.rpc.call_instance_migrate(source_node, instance,
4050                                             self.nodes_ip[target_node],
4051                                             self.op.live)
4052     msg = result.RemoteFailMsg()
4053     if msg:
4054       logging.error("Instance migration failed, trying to revert"
4055                     " disk status: %s", msg)
4056       self._AbortMigration()
4057       self._RevertDiskStatus()
4058       raise errors.OpExecError("Could not migrate instance %s: %s" %
4059                                (instance.name, msg))
4060     time.sleep(10)
4061
4062     instance.primary_node = target_node
4063     # distribute new instance config to the other nodes
4064     self.cfg.Update(instance)
4065
4066     result = self.rpc.call_finalize_migration(target_node,
4067                                               instance,
4068                                               migration_info,
4069                                               True)
4070     msg = result.RemoteFailMsg()
4071     if msg:
4072       logging.error("Instance migration succeeded, but finalization failed:"
4073                     " %s" % msg)
4074       raise errors.OpExecError("Could not finalize instance migration: %s" %
4075                                msg)
4076
4077     self._EnsureSecondary(source_node)
4078     self._WaitUntilSync()
4079     self._GoStandalone()
4080     self._GoReconnect(False)
4081     self._WaitUntilSync()
4082
4083     self.feedback_fn("* done")
4084
4085   def Exec(self, feedback_fn):
4086     """Perform the migration.
4087
4088     """
4089     self.feedback_fn = feedback_fn
4090
4091     self.source_node = self.instance.primary_node
4092     self.target_node = self.instance.secondary_nodes[0]
4093     self.all_nodes = [self.source_node, self.target_node]
4094     self.nodes_ip = {
4095       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4096       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4097       }
4098     if self.op.cleanup:
4099       return self._ExecCleanup()
4100     else:
4101       return self._ExecMigration()
4102
4103
4104 def _CreateBlockDev(lu, node, instance, device, force_create,
4105                     info, force_open):
4106   """Create a tree of block devices on a given node.
4107
4108   If this device type has to be created on secondaries, create it and
4109   all its children.
4110
4111   If not, just recurse to children keeping the same 'force' value.
4112
4113   @param lu: the lu on whose behalf we execute
4114   @param node: the node on which to create the device
4115   @type instance: L{objects.Instance}
4116   @param instance: the instance which owns the device
4117   @type device: L{objects.Disk}
4118   @param device: the device to create
4119   @type force_create: boolean
4120   @param force_create: whether to force creation of this device; this
4121       will be change to True whenever we find a device which has
4122       CreateOnSecondary() attribute
4123   @param info: the extra 'metadata' we should attach to the device
4124       (this will be represented as a LVM tag)
4125   @type force_open: boolean
4126   @param force_open: this parameter will be passes to the
4127       L{backend.BlockdevCreate} function where it specifies
4128       whether we run on primary or not, and it affects both
4129       the child assembly and the device own Open() execution
4130
4131   """
4132   if device.CreateOnSecondary():
4133     force_create = True
4134
4135   if device.children:
4136     for child in device.children:
4137       _CreateBlockDev(lu, node, instance, child, force_create,
4138                       info, force_open)
4139
4140   if not force_create:
4141     return
4142
4143   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4144
4145
4146 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4147   """Create a single block device on a given node.
4148
4149   This will not recurse over children of the device, so they must be
4150   created in advance.
4151
4152   @param lu: the lu on whose behalf we execute
4153   @param node: the node on which to create the device
4154   @type instance: L{objects.Instance}
4155   @param instance: the instance which owns the device
4156   @type device: L{objects.Disk}
4157   @param device: the device to create
4158   @param info: the extra 'metadata' we should attach to the device
4159       (this will be represented as a LVM tag)
4160   @type force_open: boolean
4161   @param force_open: this parameter will be passes to the
4162       L{backend.BlockdevCreate} function where it specifies
4163       whether we run on primary or not, and it affects both
4164       the child assembly and the device own Open() execution
4165
4166   """
4167   lu.cfg.SetDiskID(device, node)
4168   result = lu.rpc.call_blockdev_create(node, device, device.size,
4169                                        instance.name, force_open, info)
4170   msg = result.RemoteFailMsg()
4171   if msg:
4172     raise errors.OpExecError("Can't create block device %s on"
4173                              " node %s for instance %s: %s" %
4174                              (device, node, instance.name, msg))
4175   if device.physical_id is None:
4176     device.physical_id = result.payload
4177
4178
4179 def _GenerateUniqueNames(lu, exts):
4180   """Generate a suitable LV name.
4181
4182   This will generate a logical volume name for the given instance.
4183
4184   """
4185   results = []
4186   for val in exts:
4187     new_id = lu.cfg.GenerateUniqueID()
4188     results.append("%s%s" % (new_id, val))
4189   return results
4190
4191
4192 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4193                          p_minor, s_minor):
4194   """Generate a drbd8 device complete with its children.
4195
4196   """
4197   port = lu.cfg.AllocatePort()
4198   vgname = lu.cfg.GetVGName()
4199   shared_secret = lu.cfg.GenerateDRBDSecret()
4200   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4201                           logical_id=(vgname, names[0]))
4202   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4203                           logical_id=(vgname, names[1]))
4204   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4205                           logical_id=(primary, secondary, port,
4206                                       p_minor, s_minor,
4207                                       shared_secret),
4208                           children=[dev_data, dev_meta],
4209                           iv_name=iv_name)
4210   return drbd_dev
4211
4212
4213 def _GenerateDiskTemplate(lu, template_name,
4214                           instance_name, primary_node,
4215                           secondary_nodes, disk_info,
4216                           file_storage_dir, file_driver,
4217                           base_index):
4218   """Generate the entire disk layout for a given template type.
4219
4220   """
4221   #TODO: compute space requirements
4222
4223   vgname = lu.cfg.GetVGName()
4224   disk_count = len(disk_info)
4225   disks = []
4226   if template_name == constants.DT_DISKLESS:
4227     pass
4228   elif template_name == constants.DT_PLAIN:
4229     if len(secondary_nodes) != 0:
4230       raise errors.ProgrammerError("Wrong template configuration")
4231
4232     names = _GenerateUniqueNames(lu, [".disk%d" % i
4233                                       for i in range(disk_count)])
4234     for idx, disk in enumerate(disk_info):
4235       disk_index = idx + base_index
4236       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4237                               logical_id=(vgname, names[idx]),
4238                               iv_name="disk/%d" % disk_index,
4239                               mode=disk["mode"])
4240       disks.append(disk_dev)
4241   elif template_name == constants.DT_DRBD8:
4242     if len(secondary_nodes) != 1:
4243       raise errors.ProgrammerError("Wrong template configuration")
4244     remote_node = secondary_nodes[0]
4245     minors = lu.cfg.AllocateDRBDMinor(
4246       [primary_node, remote_node] * len(disk_info), instance_name)
4247
4248     names = []
4249     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4250                                                for i in range(disk_count)]):
4251       names.append(lv_prefix + "_data")
4252       names.append(lv_prefix + "_meta")
4253     for idx, disk in enumerate(disk_info):
4254       disk_index = idx + base_index
4255       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4256                                       disk["size"], names[idx*2:idx*2+2],
4257                                       "disk/%d" % disk_index,
4258                                       minors[idx*2], minors[idx*2+1])
4259       disk_dev.mode = disk["mode"]
4260       disks.append(disk_dev)
4261   elif template_name == constants.DT_FILE:
4262     if len(secondary_nodes) != 0:
4263       raise errors.ProgrammerError("Wrong template configuration")
4264
4265     for idx, disk in enumerate(disk_info):
4266       disk_index = idx + base_index
4267       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4268                               iv_name="disk/%d" % disk_index,
4269                               logical_id=(file_driver,
4270                                           "%s/disk%d" % (file_storage_dir,
4271                                                          disk_index)),
4272                               mode=disk["mode"])
4273       disks.append(disk_dev)
4274   else:
4275     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4276   return disks
4277
4278
4279 def _GetInstanceInfoText(instance):
4280   """Compute that text that should be added to the disk's metadata.
4281
4282   """
4283   return "originstname+%s" % instance.name
4284
4285
4286 def _CreateDisks(lu, instance):
4287   """Create all disks for an instance.
4288
4289   This abstracts away some work from AddInstance.
4290
4291   @type lu: L{LogicalUnit}
4292   @param lu: the logical unit on whose behalf we execute
4293   @type instance: L{objects.Instance}
4294   @param instance: the instance whose disks we should create
4295   @rtype: boolean
4296   @return: the success of the creation
4297
4298   """
4299   info = _GetInstanceInfoText(instance)
4300   pnode = instance.primary_node
4301
4302   if instance.disk_template == constants.DT_FILE:
4303     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4304     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4305
4306     if result.failed or not result.data:
4307       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4308
4309     if not result.data[0]:
4310       raise errors.OpExecError("Failed to create directory '%s'" %
4311                                file_storage_dir)
4312
4313   # Note: this needs to be kept in sync with adding of disks in
4314   # LUSetInstanceParams
4315   for device in instance.disks:
4316     logging.info("Creating volume %s for instance %s",
4317                  device.iv_name, instance.name)
4318     #HARDCODE
4319     for node in instance.all_nodes:
4320       f_create = node == pnode
4321       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4322
4323
4324 def _RemoveDisks(lu, instance):
4325   """Remove all disks for an instance.
4326
4327   This abstracts away some work from `AddInstance()` and
4328   `RemoveInstance()`. Note that in case some of the devices couldn't
4329   be removed, the removal will continue with the other ones (compare
4330   with `_CreateDisks()`).
4331
4332   @type lu: L{LogicalUnit}
4333   @param lu: the logical unit on whose behalf we execute
4334   @type instance: L{objects.Instance}
4335   @param instance: the instance whose disks we should remove
4336   @rtype: boolean
4337   @return: the success of the removal
4338
4339   """
4340   logging.info("Removing block devices for instance %s", instance.name)
4341
4342   all_result = True
4343   for device in instance.disks:
4344     for node, disk in device.ComputeNodeTree(instance.primary_node):
4345       lu.cfg.SetDiskID(disk, node)
4346       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4347       if msg:
4348         lu.LogWarning("Could not remove block device %s on node %s,"
4349                       " continuing anyway: %s", device.iv_name, node, msg)
4350         all_result = False
4351
4352   if instance.disk_template == constants.DT_FILE:
4353     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4354     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4355                                                  file_storage_dir)
4356     if result.failed or not result.data:
4357       logging.error("Could not remove directory '%s'", file_storage_dir)
4358       all_result = False
4359
4360   return all_result
4361
4362
4363 def _ComputeDiskSize(disk_template, disks):
4364   """Compute disk size requirements in the volume group
4365
4366   """
4367   # Required free disk space as a function of disk and swap space
4368   req_size_dict = {
4369     constants.DT_DISKLESS: None,
4370     constants.DT_PLAIN: sum(d["size"] for d in disks),
4371     # 128 MB are added for drbd metadata for each disk
4372     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4373     constants.DT_FILE: None,
4374   }
4375
4376   if disk_template not in req_size_dict:
4377     raise errors.ProgrammerError("Disk template '%s' size requirement"
4378                                  " is unknown" %  disk_template)
4379
4380   return req_size_dict[disk_template]
4381
4382
4383 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4384   """Hypervisor parameter validation.
4385
4386   This function abstract the hypervisor parameter validation to be
4387   used in both instance create and instance modify.
4388
4389   @type lu: L{LogicalUnit}
4390   @param lu: the logical unit for which we check
4391   @type nodenames: list
4392   @param nodenames: the list of nodes on which we should check
4393   @type hvname: string
4394   @param hvname: the name of the hypervisor we should use
4395   @type hvparams: dict
4396   @param hvparams: the parameters which we need to check
4397   @raise errors.OpPrereqError: if the parameters are not valid
4398
4399   """
4400   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4401                                                   hvname,
4402                                                   hvparams)
4403   for node in nodenames:
4404     info = hvinfo[node]
4405     if info.offline:
4406       continue
4407     msg = info.RemoteFailMsg()
4408     if msg:
4409       raise errors.OpPrereqError("Hypervisor parameter validation"
4410                                  " failed on node %s: %s" % (node, msg))
4411
4412
4413 class LUCreateInstance(LogicalUnit):
4414   """Create an instance.
4415
4416   """
4417   HPATH = "instance-add"
4418   HTYPE = constants.HTYPE_INSTANCE
4419   _OP_REQP = ["instance_name", "disks", "disk_template",
4420               "mode", "start",
4421               "wait_for_sync", "ip_check", "nics",
4422               "hvparams", "beparams"]
4423   REQ_BGL = False
4424
4425   def _ExpandNode(self, node):
4426     """Expands and checks one node name.
4427
4428     """
4429     node_full = self.cfg.ExpandNodeName(node)
4430     if node_full is None:
4431       raise errors.OpPrereqError("Unknown node %s" % node)
4432     return node_full
4433
4434   def ExpandNames(self):
4435     """ExpandNames for CreateInstance.
4436
4437     Figure out the right locks for instance creation.
4438
4439     """
4440     self.needed_locks = {}
4441
4442     # set optional parameters to none if they don't exist
4443     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4444       if not hasattr(self.op, attr):
4445         setattr(self.op, attr, None)
4446
4447     # cheap checks, mostly valid constants given
4448
4449     # verify creation mode
4450     if self.op.mode not in (constants.INSTANCE_CREATE,
4451                             constants.INSTANCE_IMPORT):
4452       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4453                                  self.op.mode)
4454
4455     # disk template and mirror node verification
4456     if self.op.disk_template not in constants.DISK_TEMPLATES:
4457       raise errors.OpPrereqError("Invalid disk template name")
4458
4459     if self.op.hypervisor is None:
4460       self.op.hypervisor = self.cfg.GetHypervisorType()
4461
4462     cluster = self.cfg.GetClusterInfo()
4463     enabled_hvs = cluster.enabled_hypervisors
4464     if self.op.hypervisor not in enabled_hvs:
4465       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4466                                  " cluster (%s)" % (self.op.hypervisor,
4467                                   ",".join(enabled_hvs)))
4468
4469     # check hypervisor parameter syntax (locally)
4470     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4471     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4472                                   self.op.hvparams)
4473     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4474     hv_type.CheckParameterSyntax(filled_hvp)
4475
4476     # fill and remember the beparams dict
4477     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4478     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4479                                     self.op.beparams)
4480
4481     #### instance parameters check
4482
4483     # instance name verification
4484     hostname1 = utils.HostInfo(self.op.instance_name)
4485     self.op.instance_name = instance_name = hostname1.name
4486
4487     # this is just a preventive check, but someone might still add this
4488     # instance in the meantime, and creation will fail at lock-add time
4489     if instance_name in self.cfg.GetInstanceList():
4490       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4491                                  instance_name)
4492
4493     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4494
4495     # NIC buildup
4496     self.nics = []
4497     for idx, nic in enumerate(self.op.nics):
4498       nic_mode_req = nic.get("mode", None)
4499       nic_mode = nic_mode_req
4500       if nic_mode is None:
4501         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4502
4503       # in routed mode, for the first nic, the default ip is 'auto'
4504       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4505         default_ip_mode = constants.VALUE_AUTO
4506       else:
4507         default_ip_mode = constants.VALUE_NONE
4508
4509       # ip validity checks
4510       ip = nic.get("ip", default_ip_mode)
4511       if ip is None or ip.lower() == constants.VALUE_NONE:
4512         nic_ip = None
4513       elif ip.lower() == constants.VALUE_AUTO:
4514         nic_ip = hostname1.ip
4515       else:
4516         if not utils.IsValidIP(ip):
4517           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4518                                      " like a valid IP" % ip)
4519         nic_ip = ip
4520
4521       # TODO: check the ip for uniqueness !!
4522       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4523         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4524
4525       # MAC address verification
4526       mac = nic.get("mac", constants.VALUE_AUTO)
4527       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4528         if not utils.IsValidMac(mac.lower()):
4529           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4530                                      mac)
4531       # bridge verification
4532       bridge = nic.get("bridge", None)
4533       link = nic.get("link", None)
4534       if bridge and link:
4535         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4536       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4537         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4538       elif bridge:
4539         link = bridge
4540
4541       nicparams = {}
4542       if nic_mode_req:
4543         nicparams[constants.NIC_MODE] = nic_mode_req
4544       if link:
4545         nicparams[constants.NIC_LINK] = link
4546
4547       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4548                                       nicparams)
4549       objects.NIC.CheckParameterSyntax(check_params)
4550       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4551
4552     # disk checks/pre-build
4553     self.disks = []
4554     for disk in self.op.disks:
4555       mode = disk.get("mode", constants.DISK_RDWR)
4556       if mode not in constants.DISK_ACCESS_SET:
4557         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4558                                    mode)
4559       size = disk.get("size", None)
4560       if size is None:
4561         raise errors.OpPrereqError("Missing disk size")
4562       try:
4563         size = int(size)
4564       except ValueError:
4565         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4566       self.disks.append({"size": size, "mode": mode})
4567
4568     # used in CheckPrereq for ip ping check
4569     self.check_ip = hostname1.ip
4570
4571     # file storage checks
4572     if (self.op.file_driver and
4573         not self.op.file_driver in constants.FILE_DRIVER):
4574       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4575                                  self.op.file_driver)
4576
4577     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4578       raise errors.OpPrereqError("File storage directory path not absolute")
4579
4580     ### Node/iallocator related checks
4581     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4582       raise errors.OpPrereqError("One and only one of iallocator and primary"
4583                                  " node must be given")
4584
4585     if self.op.iallocator:
4586       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4587     else:
4588       self.op.pnode = self._ExpandNode(self.op.pnode)
4589       nodelist = [self.op.pnode]
4590       if self.op.snode is not None:
4591         self.op.snode = self._ExpandNode(self.op.snode)
4592         nodelist.append(self.op.snode)
4593       self.needed_locks[locking.LEVEL_NODE] = nodelist
4594
4595     # in case of import lock the source node too
4596     if self.op.mode == constants.INSTANCE_IMPORT:
4597       src_node = getattr(self.op, "src_node", None)
4598       src_path = getattr(self.op, "src_path", None)
4599
4600       if src_path is None:
4601         self.op.src_path = src_path = self.op.instance_name
4602
4603       if src_node is None:
4604         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4605         self.op.src_node = None
4606         if os.path.isabs(src_path):
4607           raise errors.OpPrereqError("Importing an instance from an absolute"
4608                                      " path requires a source node option.")
4609       else:
4610         self.op.src_node = src_node = self._ExpandNode(src_node)
4611         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4612           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4613         if not os.path.isabs(src_path):
4614           self.op.src_path = src_path = \
4615             os.path.join(constants.EXPORT_DIR, src_path)
4616
4617     else: # INSTANCE_CREATE
4618       if getattr(self.op, "os_type", None) is None:
4619         raise errors.OpPrereqError("No guest OS specified")
4620
4621   def _RunAllocator(self):
4622     """Run the allocator based on input opcode.
4623
4624     """
4625     nics = [n.ToDict() for n in self.nics]
4626     ial = IAllocator(self,
4627                      mode=constants.IALLOCATOR_MODE_ALLOC,
4628                      name=self.op.instance_name,
4629                      disk_template=self.op.disk_template,
4630                      tags=[],
4631                      os=self.op.os_type,
4632                      vcpus=self.be_full[constants.BE_VCPUS],
4633                      mem_size=self.be_full[constants.BE_MEMORY],
4634                      disks=self.disks,
4635                      nics=nics,
4636                      hypervisor=self.op.hypervisor,
4637                      )
4638
4639     ial.Run(self.op.iallocator)
4640
4641     if not ial.success:
4642       raise errors.OpPrereqError("Can't compute nodes using"
4643                                  " iallocator '%s': %s" % (self.op.iallocator,
4644                                                            ial.info))
4645     if len(ial.nodes) != ial.required_nodes:
4646       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4647                                  " of nodes (%s), required %s" %
4648                                  (self.op.iallocator, len(ial.nodes),
4649                                   ial.required_nodes))
4650     self.op.pnode = ial.nodes[0]
4651     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4652                  self.op.instance_name, self.op.iallocator,
4653                  ", ".join(ial.nodes))
4654     if ial.required_nodes == 2:
4655       self.op.snode = ial.nodes[1]
4656
4657   def BuildHooksEnv(self):
4658     """Build hooks env.
4659
4660     This runs on master, primary and secondary nodes of the instance.
4661
4662     """
4663     env = {
4664       "ADD_MODE": self.op.mode,
4665       }
4666     if self.op.mode == constants.INSTANCE_IMPORT:
4667       env["SRC_NODE"] = self.op.src_node
4668       env["SRC_PATH"] = self.op.src_path
4669       env["SRC_IMAGES"] = self.src_images
4670
4671     env.update(_BuildInstanceHookEnv(
4672       name=self.op.instance_name,
4673       primary_node=self.op.pnode,
4674       secondary_nodes=self.secondaries,
4675       status=self.op.start,
4676       os_type=self.op.os_type,
4677       memory=self.be_full[constants.BE_MEMORY],
4678       vcpus=self.be_full[constants.BE_VCPUS],
4679       nics=_PreBuildNICHooksList(self, self.nics),
4680       disk_template=self.op.disk_template,
4681       disks=[(d["size"], d["mode"]) for d in self.disks],
4682     ))
4683
4684     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4685           self.secondaries)
4686     return env, nl, nl
4687
4688
4689   def CheckPrereq(self):
4690     """Check prerequisites.
4691
4692     """
4693     if (not self.cfg.GetVGName() and
4694         self.op.disk_template not in constants.DTS_NOT_LVM):
4695       raise errors.OpPrereqError("Cluster does not support lvm-based"
4696                                  " instances")
4697
4698     if self.op.mode == constants.INSTANCE_IMPORT:
4699       src_node = self.op.src_node
4700       src_path = self.op.src_path
4701
4702       if src_node is None:
4703         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4704         exp_list = self.rpc.call_export_list(locked_nodes)
4705         found = False
4706         for node in exp_list:
4707           if exp_list[node].RemoteFailMsg():
4708             continue
4709           if src_path in exp_list[node].payload:
4710             found = True
4711             self.op.src_node = src_node = node
4712             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4713                                                        src_path)
4714             break
4715         if not found:
4716           raise errors.OpPrereqError("No export found for relative path %s" %
4717                                       src_path)
4718
4719       _CheckNodeOnline(self, src_node)
4720       result = self.rpc.call_export_info(src_node, src_path)
4721       msg = result.RemoteFailMsg()
4722       if msg:
4723         raise errors.OpPrereqError("No export or invalid export found in"
4724                                    " dir %s: %s" % (src_path, msg))
4725
4726       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4727       if not export_info.has_section(constants.INISECT_EXP):
4728         raise errors.ProgrammerError("Corrupted export config")
4729
4730       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4731       if (int(ei_version) != constants.EXPORT_VERSION):
4732         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4733                                    (ei_version, constants.EXPORT_VERSION))
4734
4735       # Check that the new instance doesn't have less disks than the export
4736       instance_disks = len(self.disks)
4737       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4738       if instance_disks < export_disks:
4739         raise errors.OpPrereqError("Not enough disks to import."
4740                                    " (instance: %d, export: %d)" %
4741                                    (instance_disks, export_disks))
4742
4743       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4744       disk_images = []
4745       for idx in range(export_disks):
4746         option = 'disk%d_dump' % idx
4747         if export_info.has_option(constants.INISECT_INS, option):
4748           # FIXME: are the old os-es, disk sizes, etc. useful?
4749           export_name = export_info.get(constants.INISECT_INS, option)
4750           image = os.path.join(src_path, export_name)
4751           disk_images.append(image)
4752         else:
4753           disk_images.append(False)
4754
4755       self.src_images = disk_images
4756
4757       old_name = export_info.get(constants.INISECT_INS, 'name')
4758       # FIXME: int() here could throw a ValueError on broken exports
4759       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4760       if self.op.instance_name == old_name:
4761         for idx, nic in enumerate(self.nics):
4762           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4763             nic_mac_ini = 'nic%d_mac' % idx
4764             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4765
4766     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4767     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4768     if self.op.start and not self.op.ip_check:
4769       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4770                                  " adding an instance in start mode")
4771
4772     if self.op.ip_check:
4773       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4774         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4775                                    (self.check_ip, self.op.instance_name))
4776
4777     #### mac address generation
4778     # By generating here the mac address both the allocator and the hooks get
4779     # the real final mac address rather than the 'auto' or 'generate' value.
4780     # There is a race condition between the generation and the instance object
4781     # creation, which means that we know the mac is valid now, but we're not
4782     # sure it will be when we actually add the instance. If things go bad
4783     # adding the instance will abort because of a duplicate mac, and the
4784     # creation job will fail.
4785     for nic in self.nics:
4786       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4787         nic.mac = self.cfg.GenerateMAC()
4788
4789     #### allocator run
4790
4791     if self.op.iallocator is not None:
4792       self._RunAllocator()
4793
4794     #### node related checks
4795
4796     # check primary node
4797     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4798     assert self.pnode is not None, \
4799       "Cannot retrieve locked node %s" % self.op.pnode
4800     if pnode.offline:
4801       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4802                                  pnode.name)
4803     if pnode.drained:
4804       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4805                                  pnode.name)
4806
4807     self.secondaries = []
4808
4809     # mirror node verification
4810     if self.op.disk_template in constants.DTS_NET_MIRROR:
4811       if self.op.snode is None:
4812         raise errors.OpPrereqError("The networked disk templates need"
4813                                    " a mirror node")
4814       if self.op.snode == pnode.name:
4815         raise errors.OpPrereqError("The secondary node cannot be"
4816                                    " the primary node.")
4817       _CheckNodeOnline(self, self.op.snode)
4818       _CheckNodeNotDrained(self, self.op.snode)
4819       self.secondaries.append(self.op.snode)
4820
4821     nodenames = [pnode.name] + self.secondaries
4822
4823     req_size = _ComputeDiskSize(self.op.disk_template,
4824                                 self.disks)
4825
4826     # Check lv size requirements
4827     if req_size is not None:
4828       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4829                                          self.op.hypervisor)
4830       for node in nodenames:
4831         info = nodeinfo[node]
4832         msg = info.RemoteFailMsg()
4833         if msg:
4834           raise errors.OpPrereqError("Cannot get current information"
4835                                      " from node %s: %s" % (node, msg))
4836         info = info.payload
4837         vg_free = info.get('vg_free', None)
4838         if not isinstance(vg_free, int):
4839           raise errors.OpPrereqError("Can't compute free disk space on"
4840                                      " node %s" % node)
4841         if req_size > vg_free:
4842           raise errors.OpPrereqError("Not enough disk space on target node %s."
4843                                      " %d MB available, %d MB required" %
4844                                      (node, vg_free, req_size))
4845
4846     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4847
4848     # os verification
4849     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4850     result.Raise()
4851     if not isinstance(result.data, objects.OS):
4852       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4853                                  " primary node"  % self.op.os_type)
4854
4855     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4856
4857     # memory check on primary node
4858     if self.op.start:
4859       _CheckNodeFreeMemory(self, self.pnode.name,
4860                            "creating instance %s" % self.op.instance_name,
4861                            self.be_full[constants.BE_MEMORY],
4862                            self.op.hypervisor)
4863
4864   def Exec(self, feedback_fn):
4865     """Create and add the instance to the cluster.
4866
4867     """
4868     instance = self.op.instance_name
4869     pnode_name = self.pnode.name
4870
4871     ht_kind = self.op.hypervisor
4872     if ht_kind in constants.HTS_REQ_PORT:
4873       network_port = self.cfg.AllocatePort()
4874     else:
4875       network_port = None
4876
4877     ##if self.op.vnc_bind_address is None:
4878     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4879
4880     # this is needed because os.path.join does not accept None arguments
4881     if self.op.file_storage_dir is None:
4882       string_file_storage_dir = ""
4883     else:
4884       string_file_storage_dir = self.op.file_storage_dir
4885
4886     # build the full file storage dir path
4887     file_storage_dir = os.path.normpath(os.path.join(
4888                                         self.cfg.GetFileStorageDir(),
4889                                         string_file_storage_dir, instance))
4890
4891
4892     disks = _GenerateDiskTemplate(self,
4893                                   self.op.disk_template,
4894                                   instance, pnode_name,
4895                                   self.secondaries,
4896                                   self.disks,
4897                                   file_storage_dir,
4898                                   self.op.file_driver,
4899                                   0)
4900
4901     iobj = objects.Instance(name=instance, os=self.op.os_type,
4902                             primary_node=pnode_name,
4903                             nics=self.nics, disks=disks,
4904                             disk_template=self.op.disk_template,
4905                             admin_up=False,
4906                             network_port=network_port,
4907                             beparams=self.op.beparams,
4908                             hvparams=self.op.hvparams,
4909                             hypervisor=self.op.hypervisor,
4910                             )
4911
4912     feedback_fn("* creating instance disks...")
4913     try:
4914       _CreateDisks(self, iobj)
4915     except errors.OpExecError:
4916       self.LogWarning("Device creation failed, reverting...")
4917       try:
4918         _RemoveDisks(self, iobj)
4919       finally:
4920         self.cfg.ReleaseDRBDMinors(instance)
4921         raise
4922
4923     feedback_fn("adding instance %s to cluster config" % instance)
4924
4925     self.cfg.AddInstance(iobj)
4926     # Declare that we don't want to remove the instance lock anymore, as we've
4927     # added the instance to the config
4928     del self.remove_locks[locking.LEVEL_INSTANCE]
4929     # Unlock all the nodes
4930     if self.op.mode == constants.INSTANCE_IMPORT:
4931       nodes_keep = [self.op.src_node]
4932       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4933                        if node != self.op.src_node]
4934       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4935       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4936     else:
4937       self.context.glm.release(locking.LEVEL_NODE)
4938       del self.acquired_locks[locking.LEVEL_NODE]
4939
4940     if self.op.wait_for_sync:
4941       disk_abort = not _WaitForSync(self, iobj)
4942     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4943       # make sure the disks are not degraded (still sync-ing is ok)
4944       time.sleep(15)
4945       feedback_fn("* checking mirrors status")
4946       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4947     else:
4948       disk_abort = False
4949
4950     if disk_abort:
4951       _RemoveDisks(self, iobj)
4952       self.cfg.RemoveInstance(iobj.name)
4953       # Make sure the instance lock gets removed
4954       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4955       raise errors.OpExecError("There are some degraded disks for"
4956                                " this instance")
4957
4958     feedback_fn("creating os for instance %s on node %s" %
4959                 (instance, pnode_name))
4960
4961     if iobj.disk_template != constants.DT_DISKLESS:
4962       if self.op.mode == constants.INSTANCE_CREATE:
4963         feedback_fn("* running the instance OS create scripts...")
4964         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4965         msg = result.RemoteFailMsg()
4966         if msg:
4967           raise errors.OpExecError("Could not add os for instance %s"
4968                                    " on node %s: %s" %
4969                                    (instance, pnode_name, msg))
4970
4971       elif self.op.mode == constants.INSTANCE_IMPORT:
4972         feedback_fn("* running the instance OS import scripts...")
4973         src_node = self.op.src_node
4974         src_images = self.src_images
4975         cluster_name = self.cfg.GetClusterName()
4976         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4977                                                          src_node, src_images,
4978                                                          cluster_name)
4979         msg = import_result.RemoteFailMsg()
4980         if msg:
4981           self.LogWarning("Error while importing the disk images for instance"
4982                           " %s on node %s: %s" % (instance, pnode_name, msg))
4983       else:
4984         # also checked in the prereq part
4985         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4986                                      % self.op.mode)
4987
4988     if self.op.start:
4989       iobj.admin_up = True
4990       self.cfg.Update(iobj)
4991       logging.info("Starting instance %s on node %s", instance, pnode_name)
4992       feedback_fn("* starting instance...")
4993       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4994       msg = result.RemoteFailMsg()
4995       if msg:
4996         raise errors.OpExecError("Could not start instance: %s" % msg)
4997
4998
4999 class LUConnectConsole(NoHooksLU):
5000   """Connect to an instance's console.
5001
5002   This is somewhat special in that it returns the command line that
5003   you need to run on the master node in order to connect to the
5004   console.
5005
5006   """
5007   _OP_REQP = ["instance_name"]
5008   REQ_BGL = False
5009
5010   def ExpandNames(self):
5011     self._ExpandAndLockInstance()
5012
5013   def CheckPrereq(self):
5014     """Check prerequisites.
5015
5016     This checks that the instance is in the cluster.
5017
5018     """
5019     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5020     assert self.instance is not None, \
5021       "Cannot retrieve locked instance %s" % self.op.instance_name
5022     _CheckNodeOnline(self, self.instance.primary_node)
5023
5024   def Exec(self, feedback_fn):
5025     """Connect to the console of an instance
5026
5027     """
5028     instance = self.instance
5029     node = instance.primary_node
5030
5031     node_insts = self.rpc.call_instance_list([node],
5032                                              [instance.hypervisor])[node]
5033     msg = node_insts.RemoteFailMsg()
5034     if msg:
5035       raise errors.OpExecError("Can't get node information from %s: %s" %
5036                                (node, msg))
5037
5038     if instance.name not in node_insts.payload:
5039       raise errors.OpExecError("Instance %s is not running." % instance.name)
5040
5041     logging.debug("Connecting to console of %s on %s", instance.name, node)
5042
5043     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5044     cluster = self.cfg.GetClusterInfo()
5045     # beparams and hvparams are passed separately, to avoid editing the
5046     # instance and then saving the defaults in the instance itself.
5047     hvparams = cluster.FillHV(instance)
5048     beparams = cluster.FillBE(instance)
5049     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5050
5051     # build ssh cmdline
5052     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5053
5054
5055 class LUReplaceDisks(LogicalUnit):
5056   """Replace the disks of an instance.
5057
5058   """
5059   HPATH = "mirrors-replace"
5060   HTYPE = constants.HTYPE_INSTANCE
5061   _OP_REQP = ["instance_name", "mode", "disks"]
5062   REQ_BGL = False
5063
5064   def CheckArguments(self):
5065     if not hasattr(self.op, "remote_node"):
5066       self.op.remote_node = None
5067     if not hasattr(self.op, "iallocator"):
5068       self.op.iallocator = None
5069
5070     # check for valid parameter combination
5071     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5072     if self.op.mode == constants.REPLACE_DISK_CHG:
5073       if cnt == 2:
5074         raise errors.OpPrereqError("When changing the secondary either an"
5075                                    " iallocator script must be used or the"
5076                                    " new node given")
5077       elif cnt == 0:
5078         raise errors.OpPrereqError("Give either the iallocator or the new"
5079                                    " secondary, not both")
5080     else: # not replacing the secondary
5081       if cnt != 2:
5082         raise errors.OpPrereqError("The iallocator and new node options can"
5083                                    " be used only when changing the"
5084                                    " secondary node")
5085
5086   def ExpandNames(self):
5087     self._ExpandAndLockInstance()
5088
5089     if self.op.iallocator is not None:
5090       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5091     elif self.op.remote_node is not None:
5092       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5093       if remote_node is None:
5094         raise errors.OpPrereqError("Node '%s' not known" %
5095                                    self.op.remote_node)
5096       self.op.remote_node = remote_node
5097       # Warning: do not remove the locking of the new secondary here
5098       # unless DRBD8.AddChildren is changed to work in parallel;
5099       # currently it doesn't since parallel invocations of
5100       # FindUnusedMinor will conflict
5101       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5102       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5103     else:
5104       self.needed_locks[locking.LEVEL_NODE] = []
5105       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5106
5107   def DeclareLocks(self, level):
5108     # If we're not already locking all nodes in the set we have to declare the
5109     # instance's primary/secondary nodes.
5110     if (level == locking.LEVEL_NODE and
5111         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5112       self._LockInstancesNodes()
5113
5114   def _RunAllocator(self):
5115     """Compute a new secondary node using an IAllocator.
5116
5117     """
5118     ial = IAllocator(self,
5119                      mode=constants.IALLOCATOR_MODE_RELOC,
5120                      name=self.op.instance_name,
5121                      relocate_from=[self.sec_node])
5122
5123     ial.Run(self.op.iallocator)
5124
5125     if not ial.success:
5126       raise errors.OpPrereqError("Can't compute nodes using"
5127                                  " iallocator '%s': %s" % (self.op.iallocator,
5128                                                            ial.info))
5129     if len(ial.nodes) != ial.required_nodes:
5130       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5131                                  " of nodes (%s), required %s" %
5132                                  (len(ial.nodes), ial.required_nodes))
5133     self.op.remote_node = ial.nodes[0]
5134     self.LogInfo("Selected new secondary for the instance: %s",
5135                  self.op.remote_node)
5136
5137   def BuildHooksEnv(self):
5138     """Build hooks env.
5139
5140     This runs on the master, the primary and all the secondaries.
5141
5142     """
5143     env = {
5144       "MODE": self.op.mode,
5145       "NEW_SECONDARY": self.op.remote_node,
5146       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5147       }
5148     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5149     nl = [
5150       self.cfg.GetMasterNode(),
5151       self.instance.primary_node,
5152       ]
5153     if self.op.remote_node is not None:
5154       nl.append(self.op.remote_node)
5155     return env, nl, nl
5156
5157   def CheckPrereq(self):
5158     """Check prerequisites.
5159
5160     This checks that the instance is in the cluster.
5161
5162     """
5163     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5164     assert instance is not None, \
5165       "Cannot retrieve locked instance %s" % self.op.instance_name
5166     self.instance = instance
5167
5168     if instance.disk_template != constants.DT_DRBD8:
5169       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5170                                  " instances")
5171
5172     if len(instance.secondary_nodes) != 1:
5173       raise errors.OpPrereqError("The instance has a strange layout,"
5174                                  " expected one secondary but found %d" %
5175                                  len(instance.secondary_nodes))
5176
5177     self.sec_node = instance.secondary_nodes[0]
5178
5179     if self.op.iallocator is not None:
5180       self._RunAllocator()
5181
5182     remote_node = self.op.remote_node
5183     if remote_node is not None:
5184       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5185       assert self.remote_node_info is not None, \
5186         "Cannot retrieve locked node %s" % remote_node
5187     else:
5188       self.remote_node_info = None
5189     if remote_node == instance.primary_node:
5190       raise errors.OpPrereqError("The specified node is the primary node of"
5191                                  " the instance.")
5192     elif remote_node == self.sec_node:
5193       raise errors.OpPrereqError("The specified node is already the"
5194                                  " secondary node of the instance.")
5195
5196     if self.op.mode == constants.REPLACE_DISK_PRI:
5197       n1 = self.tgt_node = instance.primary_node
5198       n2 = self.oth_node = self.sec_node
5199     elif self.op.mode == constants.REPLACE_DISK_SEC:
5200       n1 = self.tgt_node = self.sec_node
5201       n2 = self.oth_node = instance.primary_node
5202     elif self.op.mode == constants.REPLACE_DISK_CHG:
5203       n1 = self.new_node = remote_node
5204       n2 = self.oth_node = instance.primary_node
5205       self.tgt_node = self.sec_node
5206       _CheckNodeNotDrained(self, remote_node)
5207     else:
5208       raise errors.ProgrammerError("Unhandled disk replace mode")
5209
5210     _CheckNodeOnline(self, n1)
5211     _CheckNodeOnline(self, n2)
5212
5213     if not self.op.disks:
5214       self.op.disks = range(len(instance.disks))
5215
5216     for disk_idx in self.op.disks:
5217       instance.FindDisk(disk_idx)
5218
5219   def _ExecD8DiskOnly(self, feedback_fn):
5220     """Replace a disk on the primary or secondary for dbrd8.
5221
5222     The algorithm for replace is quite complicated:
5223
5224       1. for each disk to be replaced:
5225
5226         1. create new LVs on the target node with unique names
5227         1. detach old LVs from the drbd device
5228         1. rename old LVs to name_replaced.<time_t>
5229         1. rename new LVs to old LVs
5230         1. attach the new LVs (with the old names now) to the drbd device
5231
5232       1. wait for sync across all devices
5233
5234       1. for each modified disk:
5235
5236         1. remove old LVs (which have the name name_replaces.<time_t>)
5237
5238     Failures are not very well handled.
5239
5240     """
5241     steps_total = 6
5242     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5243     instance = self.instance
5244     iv_names = {}
5245     vgname = self.cfg.GetVGName()
5246     # start of work
5247     cfg = self.cfg
5248     tgt_node = self.tgt_node
5249     oth_node = self.oth_node
5250
5251     # Step: check device activation
5252     self.proc.LogStep(1, steps_total, "check device existence")
5253     info("checking volume groups")
5254     my_vg = cfg.GetVGName()
5255     results = self.rpc.call_vg_list([oth_node, tgt_node])
5256     if not results:
5257       raise errors.OpExecError("Can't list volume groups on the nodes")
5258     for node in oth_node, tgt_node:
5259       res = results[node]
5260       msg = res.RemoteFailMsg()
5261       if msg:
5262         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5263       if my_vg not in res.payload:
5264         raise errors.OpExecError("Volume group '%s' not found on %s" %
5265                                  (my_vg, node))
5266     for idx, dev in enumerate(instance.disks):
5267       if idx not in self.op.disks:
5268         continue
5269       for node in tgt_node, oth_node:
5270         info("checking disk/%d on %s" % (idx, node))
5271         cfg.SetDiskID(dev, node)
5272         result = self.rpc.call_blockdev_find(node, dev)
5273         msg = result.RemoteFailMsg()
5274         if not msg and not result.payload:
5275           msg = "disk not found"
5276         if msg:
5277           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5278                                    (idx, node, msg))
5279
5280     # Step: check other node consistency
5281     self.proc.LogStep(2, steps_total, "check peer consistency")
5282     for idx, dev in enumerate(instance.disks):
5283       if idx not in self.op.disks:
5284         continue
5285       info("checking disk/%d consistency on %s" % (idx, oth_node))
5286       if not _CheckDiskConsistency(self, dev, oth_node,
5287                                    oth_node==instance.primary_node):
5288         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5289                                  " to replace disks on this node (%s)" %
5290                                  (oth_node, tgt_node))
5291
5292     # Step: create new storage
5293     self.proc.LogStep(3, steps_total, "allocate new storage")
5294     for idx, dev in enumerate(instance.disks):
5295       if idx not in self.op.disks:
5296         continue
5297       size = dev.size
5298       cfg.SetDiskID(dev, tgt_node)
5299       lv_names = [".disk%d_%s" % (idx, suf)
5300                   for suf in ["data", "meta"]]
5301       names = _GenerateUniqueNames(self, lv_names)
5302       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5303                              logical_id=(vgname, names[0]))
5304       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5305                              logical_id=(vgname, names[1]))
5306       new_lvs = [lv_data, lv_meta]
5307       old_lvs = dev.children
5308       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5309       info("creating new local storage on %s for %s" %
5310            (tgt_node, dev.iv_name))
5311       # we pass force_create=True to force the LVM creation
5312       for new_lv in new_lvs:
5313         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5314                         _GetInstanceInfoText(instance), False)
5315
5316     # Step: for each lv, detach+rename*2+attach
5317     self.proc.LogStep(4, steps_total, "change drbd configuration")
5318     for dev, old_lvs, new_lvs in iv_names.itervalues():
5319       info("detaching %s drbd from local storage" % dev.iv_name)
5320       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5321       msg = result.RemoteFailMsg()
5322       if msg:
5323         raise errors.OpExecError("Can't detach drbd from local storage on node"
5324                                  " %s for device %s: %s" %
5325                                  (tgt_node, dev.iv_name, msg))
5326       #dev.children = []
5327       #cfg.Update(instance)
5328
5329       # ok, we created the new LVs, so now we know we have the needed
5330       # storage; as such, we proceed on the target node to rename
5331       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5332       # using the assumption that logical_id == physical_id (which in
5333       # turn is the unique_id on that node)
5334
5335       # FIXME(iustin): use a better name for the replaced LVs
5336       temp_suffix = int(time.time())
5337       ren_fn = lambda d, suff: (d.physical_id[0],
5338                                 d.physical_id[1] + "_replaced-%s" % suff)
5339       # build the rename list based on what LVs exist on the node
5340       rlist = []
5341       for to_ren in old_lvs:
5342         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5343         if not result.RemoteFailMsg() and result.payload:
5344           # device exists
5345           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5346
5347       info("renaming the old LVs on the target node")
5348       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5349       msg = result.RemoteFailMsg()
5350       if msg:
5351         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5352                                  (tgt_node, msg))
5353       # now we rename the new LVs to the old LVs
5354       info("renaming the new LVs on the target node")
5355       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5356       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5357       msg = result.RemoteFailMsg()
5358       if msg:
5359         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5360                                  (tgt_node, msg))
5361
5362       for old, new in zip(old_lvs, new_lvs):
5363         new.logical_id = old.logical_id
5364         cfg.SetDiskID(new, tgt_node)
5365
5366       for disk in old_lvs:
5367         disk.logical_id = ren_fn(disk, temp_suffix)
5368         cfg.SetDiskID(disk, tgt_node)
5369
5370       # now that the new lvs have the old name, we can add them to the device
5371       info("adding new mirror component on %s" % tgt_node)
5372       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5373       msg = result.RemoteFailMsg()
5374       if msg:
5375         for new_lv in new_lvs:
5376           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5377           if msg:
5378             warning("Can't rollback device %s: %s", dev, msg,
5379                     hint="cleanup manually the unused logical volumes")
5380         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5381
5382       dev.children = new_lvs
5383       cfg.Update(instance)
5384
5385     # Step: wait for sync
5386
5387     # this can fail as the old devices are degraded and _WaitForSync
5388     # does a combined result over all disks, so we don't check its
5389     # return value
5390     self.proc.LogStep(5, steps_total, "sync devices")
5391     _WaitForSync(self, instance, unlock=True)
5392
5393     # so check manually all the devices
5394     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5395       cfg.SetDiskID(dev, instance.primary_node)
5396       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5397       msg = result.RemoteFailMsg()
5398       if not msg and not result.payload:
5399         msg = "disk not found"
5400       if msg:
5401         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5402                                  (name, msg))
5403       if result.payload[5]:
5404         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5405
5406     # Step: remove old storage
5407     self.proc.LogStep(6, steps_total, "removing old storage")
5408     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5409       info("remove logical volumes for %s" % name)
5410       for lv in old_lvs:
5411         cfg.SetDiskID(lv, tgt_node)
5412         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5413         if msg:
5414           warning("Can't remove old LV: %s" % msg,
5415                   hint="manually remove unused LVs")
5416           continue
5417
5418   def _ExecD8Secondary(self, feedback_fn):
5419     """Replace the secondary node for drbd8.
5420
5421     The algorithm for replace is quite complicated:
5422       - for all disks of the instance:
5423         - create new LVs on the new node with same names
5424         - shutdown the drbd device on the old secondary
5425         - disconnect the drbd network on the primary
5426         - create the drbd device on the new secondary
5427         - network attach the drbd on the primary, using an artifice:
5428           the drbd code for Attach() will connect to the network if it
5429           finds a device which is connected to the good local disks but
5430           not network enabled
5431       - wait for sync across all devices
5432       - remove all disks from the old secondary
5433
5434     Failures are not very well handled.
5435
5436     """
5437     steps_total = 6
5438     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5439     instance = self.instance
5440     iv_names = {}
5441     # start of work
5442     cfg = self.cfg
5443     old_node = self.tgt_node
5444     new_node = self.new_node
5445     pri_node = instance.primary_node
5446     nodes_ip = {
5447       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5448       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5449       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5450       }
5451
5452     # Step: check device activation
5453     self.proc.LogStep(1, steps_total, "check device existence")
5454     info("checking volume groups")
5455     my_vg = cfg.GetVGName()
5456     results = self.rpc.call_vg_list([pri_node, new_node])
5457     for node in pri_node, new_node:
5458       res = results[node]
5459       msg = res.RemoteFailMsg()
5460       if msg:
5461         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5462       if my_vg not in res.payload:
5463         raise errors.OpExecError("Volume group '%s' not found on %s" %
5464                                  (my_vg, node))
5465     for idx, dev in enumerate(instance.disks):
5466       if idx not in self.op.disks:
5467         continue
5468       info("checking disk/%d on %s" % (idx, pri_node))
5469       cfg.SetDiskID(dev, pri_node)
5470       result = self.rpc.call_blockdev_find(pri_node, dev)
5471       msg = result.RemoteFailMsg()
5472       if not msg and not result.payload:
5473         msg = "disk not found"
5474       if msg:
5475         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5476                                  (idx, pri_node, msg))
5477
5478     # Step: check other node consistency
5479     self.proc.LogStep(2, steps_total, "check peer consistency")
5480     for idx, dev in enumerate(instance.disks):
5481       if idx not in self.op.disks:
5482         continue
5483       info("checking disk/%d consistency on %s" % (idx, pri_node))
5484       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5485         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5486                                  " unsafe to replace the secondary" %
5487                                  pri_node)
5488
5489     # Step: create new storage
5490     self.proc.LogStep(3, steps_total, "allocate new storage")
5491     for idx, dev in enumerate(instance.disks):
5492       info("adding new local storage on %s for disk/%d" %
5493            (new_node, idx))
5494       # we pass force_create=True to force LVM creation
5495       for new_lv in dev.children:
5496         _CreateBlockDev(self, new_node, instance, new_lv, True,
5497                         _GetInstanceInfoText(instance), False)
5498
5499     # Step 4: dbrd minors and drbd setups changes
5500     # after this, we must manually remove the drbd minors on both the
5501     # error and the success paths
5502     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5503                                    instance.name)
5504     logging.debug("Allocated minors %s" % (minors,))
5505     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5506     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5507       size = dev.size
5508       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5509       # create new devices on new_node; note that we create two IDs:
5510       # one without port, so the drbd will be activated without
5511       # networking information on the new node at this stage, and one
5512       # with network, for the latter activation in step 4
5513       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5514       if pri_node == o_node1:
5515         p_minor = o_minor1
5516       else:
5517         p_minor = o_minor2
5518
5519       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5520       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5521
5522       iv_names[idx] = (dev, dev.children, new_net_id)
5523       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5524                     new_net_id)
5525       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5526                               logical_id=new_alone_id,
5527                               children=dev.children)
5528       try:
5529         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5530                               _GetInstanceInfoText(instance), False)
5531       except errors.GenericError:
5532         self.cfg.ReleaseDRBDMinors(instance.name)
5533         raise
5534
5535     for idx, dev in enumerate(instance.disks):
5536       # we have new devices, shutdown the drbd on the old secondary
5537       info("shutting down drbd for disk/%d on old node" % idx)
5538       cfg.SetDiskID(dev, old_node)
5539       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5540       if msg:
5541         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5542                 (idx, msg),
5543                 hint="Please cleanup this device manually as soon as possible")
5544
5545     info("detaching primary drbds from the network (=> standalone)")
5546     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5547                                                instance.disks)[pri_node]
5548
5549     msg = result.RemoteFailMsg()
5550     if msg:
5551       # detaches didn't succeed (unlikely)
5552       self.cfg.ReleaseDRBDMinors(instance.name)
5553       raise errors.OpExecError("Can't detach the disks from the network on"
5554                                " old node: %s" % (msg,))
5555
5556     # if we managed to detach at least one, we update all the disks of
5557     # the instance to point to the new secondary
5558     info("updating instance configuration")
5559     for dev, _, new_logical_id in iv_names.itervalues():
5560       dev.logical_id = new_logical_id
5561       cfg.SetDiskID(dev, pri_node)
5562     cfg.Update(instance)
5563
5564     # and now perform the drbd attach
5565     info("attaching primary drbds to new secondary (standalone => connected)")
5566     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5567                                            instance.disks, instance.name,
5568                                            False)
5569     for to_node, to_result in result.items():
5570       msg = to_result.RemoteFailMsg()
5571       if msg:
5572         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5573                 hint="please do a gnt-instance info to see the"
5574                 " status of disks")
5575
5576     # this can fail as the old devices are degraded and _WaitForSync
5577     # does a combined result over all disks, so we don't check its
5578     # return value
5579     self.proc.LogStep(5, steps_total, "sync devices")
5580     _WaitForSync(self, instance, unlock=True)
5581
5582     # so check manually all the devices
5583     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5584       cfg.SetDiskID(dev, pri_node)
5585       result = self.rpc.call_blockdev_find(pri_node, dev)
5586       msg = result.RemoteFailMsg()
5587       if not msg and not result.payload:
5588         msg = "disk not found"
5589       if msg:
5590         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5591                                  (idx, msg))
5592       if result.payload[5]:
5593         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5594
5595     self.proc.LogStep(6, steps_total, "removing old storage")
5596     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5597       info("remove logical volumes for disk/%d" % idx)
5598       for lv in old_lvs:
5599         cfg.SetDiskID(lv, old_node)
5600         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5601         if msg:
5602           warning("Can't remove LV on old secondary: %s", msg,
5603                   hint="Cleanup stale volumes by hand")
5604
5605   def Exec(self, feedback_fn):
5606     """Execute disk replacement.
5607
5608     This dispatches the disk replacement to the appropriate handler.
5609
5610     """
5611     instance = self.instance
5612
5613     # Activate the instance disks if we're replacing them on a down instance
5614     if not instance.admin_up:
5615       _StartInstanceDisks(self, instance, True)
5616
5617     if self.op.mode == constants.REPLACE_DISK_CHG:
5618       fn = self._ExecD8Secondary
5619     else:
5620       fn = self._ExecD8DiskOnly
5621
5622     ret = fn(feedback_fn)
5623
5624     # Deactivate the instance disks if we're replacing them on a down instance
5625     if not instance.admin_up:
5626       _SafeShutdownInstanceDisks(self, instance)
5627
5628     return ret
5629
5630
5631 class LUGrowDisk(LogicalUnit):
5632   """Grow a disk of an instance.
5633
5634   """
5635   HPATH = "disk-grow"
5636   HTYPE = constants.HTYPE_INSTANCE
5637   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5638   REQ_BGL = False
5639
5640   def ExpandNames(self):
5641     self._ExpandAndLockInstance()
5642     self.needed_locks[locking.LEVEL_NODE] = []
5643     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5644
5645   def DeclareLocks(self, level):
5646     if level == locking.LEVEL_NODE:
5647       self._LockInstancesNodes()
5648
5649   def BuildHooksEnv(self):
5650     """Build hooks env.
5651
5652     This runs on the master, the primary and all the secondaries.
5653
5654     """
5655     env = {
5656       "DISK": self.op.disk,
5657       "AMOUNT": self.op.amount,
5658       }
5659     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5660     nl = [
5661       self.cfg.GetMasterNode(),
5662       self.instance.primary_node,
5663       ]
5664     return env, nl, nl
5665
5666   def CheckPrereq(self):
5667     """Check prerequisites.
5668
5669     This checks that the instance is in the cluster.
5670
5671     """
5672     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5673     assert instance is not None, \
5674       "Cannot retrieve locked instance %s" % self.op.instance_name
5675     nodenames = list(instance.all_nodes)
5676     for node in nodenames:
5677       _CheckNodeOnline(self, node)
5678
5679
5680     self.instance = instance
5681
5682     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5683       raise errors.OpPrereqError("Instance's disk layout does not support"
5684                                  " growing.")
5685
5686     self.disk = instance.FindDisk(self.op.disk)
5687
5688     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5689                                        instance.hypervisor)
5690     for node in nodenames:
5691       info = nodeinfo[node]
5692       msg = info.RemoteFailMsg()
5693       if msg:
5694         raise errors.OpPrereqError("Cannot get current information"
5695                                    " from node %s:" % (node, msg))
5696       vg_free = info.payload.get('vg_free', None)
5697       if not isinstance(vg_free, int):
5698         raise errors.OpPrereqError("Can't compute free disk space on"
5699                                    " node %s" % node)
5700       if self.op.amount > vg_free:
5701         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5702                                    " %d MiB available, %d MiB required" %
5703                                    (node, vg_free, self.op.amount))
5704
5705   def Exec(self, feedback_fn):
5706     """Execute disk grow.
5707
5708     """
5709     instance = self.instance
5710     disk = self.disk
5711     for node in instance.all_nodes:
5712       self.cfg.SetDiskID(disk, node)
5713       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5714       msg = result.RemoteFailMsg()
5715       if msg:
5716         raise errors.OpExecError("Grow request failed to node %s: %s" %
5717                                  (node, msg))
5718     disk.RecordGrow(self.op.amount)
5719     self.cfg.Update(instance)
5720     if self.op.wait_for_sync:
5721       disk_abort = not _WaitForSync(self, instance)
5722       if disk_abort:
5723         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5724                              " status.\nPlease check the instance.")
5725
5726
5727 class LUQueryInstanceData(NoHooksLU):
5728   """Query runtime instance data.
5729
5730   """
5731   _OP_REQP = ["instances", "static"]
5732   REQ_BGL = False
5733
5734   def ExpandNames(self):
5735     self.needed_locks = {}
5736     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5737
5738     if not isinstance(self.op.instances, list):
5739       raise errors.OpPrereqError("Invalid argument type 'instances'")
5740
5741     if self.op.instances:
5742       self.wanted_names = []
5743       for name in self.op.instances:
5744         full_name = self.cfg.ExpandInstanceName(name)
5745         if full_name is None:
5746           raise errors.OpPrereqError("Instance '%s' not known" % name)
5747         self.wanted_names.append(full_name)
5748       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5749     else:
5750       self.wanted_names = None
5751       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5752
5753     self.needed_locks[locking.LEVEL_NODE] = []
5754     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5755
5756   def DeclareLocks(self, level):
5757     if level == locking.LEVEL_NODE:
5758       self._LockInstancesNodes()
5759
5760   def CheckPrereq(self):
5761     """Check prerequisites.
5762
5763     This only checks the optional instance list against the existing names.
5764
5765     """
5766     if self.wanted_names is None:
5767       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5768
5769     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5770                              in self.wanted_names]
5771     return
5772
5773   def _ComputeDiskStatus(self, instance, snode, dev):
5774     """Compute block device status.
5775
5776     """
5777     static = self.op.static
5778     if not static:
5779       self.cfg.SetDiskID(dev, instance.primary_node)
5780       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5781       if dev_pstatus.offline:
5782         dev_pstatus = None
5783       else:
5784         msg = dev_pstatus.RemoteFailMsg()
5785         if msg:
5786           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5787                                    (instance.name, msg))
5788         dev_pstatus = dev_pstatus.payload
5789     else:
5790       dev_pstatus = None
5791
5792     if dev.dev_type in constants.LDS_DRBD:
5793       # we change the snode then (otherwise we use the one passed in)
5794       if dev.logical_id[0] == instance.primary_node:
5795         snode = dev.logical_id[1]
5796       else:
5797         snode = dev.logical_id[0]
5798
5799     if snode and not static:
5800       self.cfg.SetDiskID(dev, snode)
5801       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5802       if dev_sstatus.offline:
5803         dev_sstatus = None
5804       else:
5805         msg = dev_sstatus.RemoteFailMsg()
5806         if msg:
5807           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5808                                    (instance.name, msg))
5809         dev_sstatus = dev_sstatus.payload
5810     else:
5811       dev_sstatus = None
5812
5813     if dev.children:
5814       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5815                       for child in dev.children]
5816     else:
5817       dev_children = []
5818
5819     data = {
5820       "iv_name": dev.iv_name,
5821       "dev_type": dev.dev_type,
5822       "logical_id": dev.logical_id,
5823       "physical_id": dev.physical_id,
5824       "pstatus": dev_pstatus,
5825       "sstatus": dev_sstatus,
5826       "children": dev_children,
5827       "mode": dev.mode,
5828       }
5829
5830     return data
5831
5832   def Exec(self, feedback_fn):
5833     """Gather and return data"""
5834     result = {}
5835
5836     cluster = self.cfg.GetClusterInfo()
5837
5838     for instance in self.wanted_instances:
5839       if not self.op.static:
5840         remote_info = self.rpc.call_instance_info(instance.primary_node,
5841                                                   instance.name,
5842                                                   instance.hypervisor)
5843         msg = remote_info.RemoteFailMsg()
5844         if msg:
5845           raise errors.OpExecError("Error checking node %s: %s" %
5846                                    (instance.primary_node, msg))
5847         remote_info = remote_info.payload
5848         if remote_info and "state" in remote_info:
5849           remote_state = "up"
5850         else:
5851           remote_state = "down"
5852       else:
5853         remote_state = None
5854       if instance.admin_up:
5855         config_state = "up"
5856       else:
5857         config_state = "down"
5858
5859       disks = [self._ComputeDiskStatus(instance, None, device)
5860                for device in instance.disks]
5861
5862       idict = {
5863         "name": instance.name,
5864         "config_state": config_state,
5865         "run_state": remote_state,
5866         "pnode": instance.primary_node,
5867         "snodes": instance.secondary_nodes,
5868         "os": instance.os,
5869         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5870         "disks": disks,
5871         "hypervisor": instance.hypervisor,
5872         "network_port": instance.network_port,
5873         "hv_instance": instance.hvparams,
5874         "hv_actual": cluster.FillHV(instance),
5875         "be_instance": instance.beparams,
5876         "be_actual": cluster.FillBE(instance),
5877         }
5878
5879       result[instance.name] = idict
5880
5881     return result
5882
5883
5884 class LUSetInstanceParams(LogicalUnit):
5885   """Modifies an instances's parameters.
5886
5887   """
5888   HPATH = "instance-modify"
5889   HTYPE = constants.HTYPE_INSTANCE
5890   _OP_REQP = ["instance_name"]
5891   REQ_BGL = False
5892
5893   def CheckArguments(self):
5894     if not hasattr(self.op, 'nics'):
5895       self.op.nics = []
5896     if not hasattr(self.op, 'disks'):
5897       self.op.disks = []
5898     if not hasattr(self.op, 'beparams'):
5899       self.op.beparams = {}
5900     if not hasattr(self.op, 'hvparams'):
5901       self.op.hvparams = {}
5902     self.op.force = getattr(self.op, "force", False)
5903     if not (self.op.nics or self.op.disks or
5904             self.op.hvparams or self.op.beparams):
5905       raise errors.OpPrereqError("No changes submitted")
5906
5907     # Disk validation
5908     disk_addremove = 0
5909     for disk_op, disk_dict in self.op.disks:
5910       if disk_op == constants.DDM_REMOVE:
5911         disk_addremove += 1
5912         continue
5913       elif disk_op == constants.DDM_ADD:
5914         disk_addremove += 1
5915       else:
5916         if not isinstance(disk_op, int):
5917           raise errors.OpPrereqError("Invalid disk index")
5918       if disk_op == constants.DDM_ADD:
5919         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5920         if mode not in constants.DISK_ACCESS_SET:
5921           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5922         size = disk_dict.get('size', None)
5923         if size is None:
5924           raise errors.OpPrereqError("Required disk parameter size missing")
5925         try:
5926           size = int(size)
5927         except ValueError, err:
5928           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5929                                      str(err))
5930         disk_dict['size'] = size
5931       else:
5932         # modification of disk
5933         if 'size' in disk_dict:
5934           raise errors.OpPrereqError("Disk size change not possible, use"
5935                                      " grow-disk")
5936
5937     if disk_addremove > 1:
5938       raise errors.OpPrereqError("Only one disk add or remove operation"
5939                                  " supported at a time")
5940
5941     # NIC validation
5942     nic_addremove = 0
5943     for nic_op, nic_dict in self.op.nics:
5944       if nic_op == constants.DDM_REMOVE:
5945         nic_addremove += 1
5946         continue
5947       elif nic_op == constants.DDM_ADD:
5948         nic_addremove += 1
5949       else:
5950         if not isinstance(nic_op, int):
5951           raise errors.OpPrereqError("Invalid nic index")
5952
5953       # nic_dict should be a dict
5954       nic_ip = nic_dict.get('ip', None)
5955       if nic_ip is not None:
5956         if nic_ip.lower() == constants.VALUE_NONE:
5957           nic_dict['ip'] = None
5958         else:
5959           if not utils.IsValidIP(nic_ip):
5960             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5961
5962       nic_bridge = nic_dict.get('bridge', None)
5963       nic_link = nic_dict.get('link', None)
5964       if nic_bridge and nic_link:
5965         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5966       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5967         nic_dict['bridge'] = None
5968       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5969         nic_dict['link'] = None
5970
5971       if nic_op == constants.DDM_ADD:
5972         nic_mac = nic_dict.get('mac', None)
5973         if nic_mac is None:
5974           nic_dict['mac'] = constants.VALUE_AUTO
5975
5976       if 'mac' in nic_dict:
5977         nic_mac = nic_dict['mac']
5978         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5979           if not utils.IsValidMac(nic_mac):
5980             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5981         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5982           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5983                                      " modifying an existing nic")
5984
5985     if nic_addremove > 1:
5986       raise errors.OpPrereqError("Only one NIC add or remove operation"
5987                                  " supported at a time")
5988
5989   def ExpandNames(self):
5990     self._ExpandAndLockInstance()
5991     self.needed_locks[locking.LEVEL_NODE] = []
5992     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5993
5994   def DeclareLocks(self, level):
5995     if level == locking.LEVEL_NODE:
5996       self._LockInstancesNodes()
5997
5998   def BuildHooksEnv(self):
5999     """Build hooks env.
6000
6001     This runs on the master, primary and secondaries.
6002
6003     """
6004     args = dict()
6005     if constants.BE_MEMORY in self.be_new:
6006       args['memory'] = self.be_new[constants.BE_MEMORY]
6007     if constants.BE_VCPUS in self.be_new:
6008       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6009     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6010     # information at all.
6011     if self.op.nics:
6012       args['nics'] = []
6013       nic_override = dict(self.op.nics)
6014       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6015       for idx, nic in enumerate(self.instance.nics):
6016         if idx in nic_override:
6017           this_nic_override = nic_override[idx]
6018         else:
6019           this_nic_override = {}
6020         if 'ip' in this_nic_override:
6021           ip = this_nic_override['ip']
6022         else:
6023           ip = nic.ip
6024         if 'mac' in this_nic_override:
6025           mac = this_nic_override['mac']
6026         else:
6027           mac = nic.mac
6028         if idx in self.nic_pnew:
6029           nicparams = self.nic_pnew[idx]
6030         else:
6031           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6032         mode = nicparams[constants.NIC_MODE]
6033         link = nicparams[constants.NIC_LINK]
6034         args['nics'].append((ip, mac, mode, link))
6035       if constants.DDM_ADD in nic_override:
6036         ip = nic_override[constants.DDM_ADD].get('ip', None)
6037         mac = nic_override[constants.DDM_ADD]['mac']
6038         nicparams = self.nic_pnew[constants.DDM_ADD]
6039         mode = nicparams[constants.NIC_MODE]
6040         link = nicparams[constants.NIC_LINK]
6041         args['nics'].append((ip, mac, mode, link))
6042       elif constants.DDM_REMOVE in nic_override:
6043         del args['nics'][-1]
6044
6045     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6046     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6047     return env, nl, nl
6048
6049   def _GetUpdatedParams(self, old_params, update_dict,
6050                         default_values, parameter_types):
6051     """Return the new params dict for the given params.
6052
6053     @type old_params: dict
6054     @type old_params: old parameters
6055     @type update_dict: dict
6056     @type update_dict: dict containing new parameter values,
6057                        or constants.VALUE_DEFAULT to reset the
6058                        parameter to its default value
6059     @type default_values: dict
6060     @param default_values: default values for the filled parameters
6061     @type parameter_types: dict
6062     @param parameter_types: dict mapping target dict keys to types
6063                             in constants.ENFORCEABLE_TYPES
6064     @rtype: (dict, dict)
6065     @return: (new_parameters, filled_parameters)
6066
6067     """
6068     params_copy = copy.deepcopy(old_params)
6069     for key, val in update_dict.iteritems():
6070       if val == constants.VALUE_DEFAULT:
6071         try:
6072           del params_copy[key]
6073         except KeyError:
6074           pass
6075       else:
6076         params_copy[key] = val
6077     utils.ForceDictType(params_copy, parameter_types)
6078     params_filled = objects.FillDict(default_values, params_copy)
6079     return (params_copy, params_filled)
6080
6081   def CheckPrereq(self):
6082     """Check prerequisites.
6083
6084     This only checks the instance list against the existing names.
6085
6086     """
6087     force = self.force = self.op.force
6088
6089     # checking the new params on the primary/secondary nodes
6090
6091     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6092     cluster = self.cluster = self.cfg.GetClusterInfo()
6093     assert self.instance is not None, \
6094       "Cannot retrieve locked instance %s" % self.op.instance_name
6095     pnode = instance.primary_node
6096     nodelist = list(instance.all_nodes)
6097
6098     # hvparams processing
6099     if self.op.hvparams:
6100       i_hvdict, hv_new = self._GetUpdatedParams(
6101                              instance.hvparams, self.op.hvparams,
6102                              cluster.hvparams[instance.hypervisor],
6103                              constants.HVS_PARAMETER_TYPES)
6104       # local check
6105       hypervisor.GetHypervisor(
6106         instance.hypervisor).CheckParameterSyntax(hv_new)
6107       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6108       self.hv_new = hv_new # the new actual values
6109       self.hv_inst = i_hvdict # the new dict (without defaults)
6110     else:
6111       self.hv_new = self.hv_inst = {}
6112
6113     # beparams processing
6114     if self.op.beparams:
6115       i_bedict, be_new = self._GetUpdatedParams(
6116                              instance.beparams, self.op.beparams,
6117                              cluster.beparams[constants.PP_DEFAULT],
6118                              constants.BES_PARAMETER_TYPES)
6119       self.be_new = be_new # the new actual values
6120       self.be_inst = i_bedict # the new dict (without defaults)
6121     else:
6122       self.be_new = self.be_inst = {}
6123
6124     self.warn = []
6125
6126     if constants.BE_MEMORY in self.op.beparams and not self.force:
6127       mem_check_list = [pnode]
6128       if be_new[constants.BE_AUTO_BALANCE]:
6129         # either we changed auto_balance to yes or it was from before
6130         mem_check_list.extend(instance.secondary_nodes)
6131       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6132                                                   instance.hypervisor)
6133       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6134                                          instance.hypervisor)
6135       pninfo = nodeinfo[pnode]
6136       msg = pninfo.RemoteFailMsg()
6137       if msg:
6138         # Assume the primary node is unreachable and go ahead
6139         self.warn.append("Can't get info from primary node %s: %s" %
6140                          (pnode,  msg))
6141       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6142         self.warn.append("Node data from primary node %s doesn't contain"
6143                          " free memory information" % pnode)
6144       elif instance_info.RemoteFailMsg():
6145         self.warn.append("Can't get instance runtime information: %s" %
6146                         instance_info.RemoteFailMsg())
6147       else:
6148         if instance_info.payload:
6149           current_mem = int(instance_info.payload['memory'])
6150         else:
6151           # Assume instance not running
6152           # (there is a slight race condition here, but it's not very probable,
6153           # and we have no other way to check)
6154           current_mem = 0
6155         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6156                     pninfo.payload['memory_free'])
6157         if miss_mem > 0:
6158           raise errors.OpPrereqError("This change will prevent the instance"
6159                                      " from starting, due to %d MB of memory"
6160                                      " missing on its primary node" % miss_mem)
6161
6162       if be_new[constants.BE_AUTO_BALANCE]:
6163         for node, nres in nodeinfo.items():
6164           if node not in instance.secondary_nodes:
6165             continue
6166           msg = nres.RemoteFailMsg()
6167           if msg:
6168             self.warn.append("Can't get info from secondary node %s: %s" %
6169                              (node, msg))
6170           elif not isinstance(nres.payload.get('memory_free', None), int):
6171             self.warn.append("Secondary node %s didn't return free"
6172                              " memory information" % node)
6173           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6174             self.warn.append("Not enough memory to failover instance to"
6175                              " secondary node %s" % node)
6176
6177     # NIC processing
6178     self.nic_pnew = {}
6179     self.nic_pinst = {}
6180     for nic_op, nic_dict in self.op.nics:
6181       if nic_op == constants.DDM_REMOVE:
6182         if not instance.nics:
6183           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6184         continue
6185       if nic_op != constants.DDM_ADD:
6186         # an existing nic
6187         if nic_op < 0 or nic_op >= len(instance.nics):
6188           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6189                                      " are 0 to %d" %
6190                                      (nic_op, len(instance.nics)))
6191         old_nic_params = instance.nics[nic_op].nicparams
6192         old_nic_ip = instance.nics[nic_op].ip
6193       else:
6194         old_nic_params = {}
6195         old_nic_ip = None
6196
6197       update_params_dict = dict([(key, nic_dict[key])
6198                                  for key in constants.NICS_PARAMETERS
6199                                  if key in nic_dict])
6200
6201       if 'bridge' in nic_dict:
6202         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6203
6204       new_nic_params, new_filled_nic_params = \
6205           self._GetUpdatedParams(old_nic_params, update_params_dict,
6206                                  cluster.nicparams[constants.PP_DEFAULT],
6207                                  constants.NICS_PARAMETER_TYPES)
6208       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6209       self.nic_pinst[nic_op] = new_nic_params
6210       self.nic_pnew[nic_op] = new_filled_nic_params
6211       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6212
6213       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6214         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6215         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6216         msg = result.RemoteFailMsg()
6217         if msg:
6218           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6219           if self.force:
6220             self.warn.append(msg)
6221           else:
6222             raise errors.OpPrereqError(msg)
6223       if new_nic_mode == constants.NIC_MODE_ROUTED:
6224         if 'ip' in nic_dict:
6225           nic_ip = nic_dict['ip']
6226         else:
6227           nic_ip = old_nic_ip
6228         if nic_ip is None:
6229           raise errors.OpPrereqError('Cannot set the nic ip to None'
6230                                      ' on a routed nic')
6231       if 'mac' in nic_dict:
6232         nic_mac = nic_dict['mac']
6233         if nic_mac is None:
6234           raise errors.OpPrereqError('Cannot set the nic mac to None')
6235         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6236           # otherwise generate the mac
6237           nic_dict['mac'] = self.cfg.GenerateMAC()
6238         else:
6239           # or validate/reserve the current one
6240           if self.cfg.IsMacInUse(nic_mac):
6241             raise errors.OpPrereqError("MAC address %s already in use"
6242                                        " in cluster" % nic_mac)
6243
6244     # DISK processing
6245     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6246       raise errors.OpPrereqError("Disk operations not supported for"
6247                                  " diskless instances")
6248     for disk_op, disk_dict in self.op.disks:
6249       if disk_op == constants.DDM_REMOVE:
6250         if len(instance.disks) == 1:
6251           raise errors.OpPrereqError("Cannot remove the last disk of"
6252                                      " an instance")
6253         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6254         ins_l = ins_l[pnode]
6255         msg = ins_l.RemoteFailMsg()
6256         if msg:
6257           raise errors.OpPrereqError("Can't contact node %s: %s" %
6258                                      (pnode, msg))
6259         if instance.name in ins_l.payload:
6260           raise errors.OpPrereqError("Instance is running, can't remove"
6261                                      " disks.")
6262
6263       if (disk_op == constants.DDM_ADD and
6264           len(instance.nics) >= constants.MAX_DISKS):
6265         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6266                                    " add more" % constants.MAX_DISKS)
6267       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6268         # an existing disk
6269         if disk_op < 0 or disk_op >= len(instance.disks):
6270           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6271                                      " are 0 to %d" %
6272                                      (disk_op, len(instance.disks)))
6273
6274     return
6275
6276   def Exec(self, feedback_fn):
6277     """Modifies an instance.
6278
6279     All parameters take effect only at the next restart of the instance.
6280
6281     """
6282     # Process here the warnings from CheckPrereq, as we don't have a
6283     # feedback_fn there.
6284     for warn in self.warn:
6285       feedback_fn("WARNING: %s" % warn)
6286
6287     result = []
6288     instance = self.instance
6289     cluster = self.cluster
6290     # disk changes
6291     for disk_op, disk_dict in self.op.disks:
6292       if disk_op == constants.DDM_REMOVE:
6293         # remove the last disk
6294         device = instance.disks.pop()
6295         device_idx = len(instance.disks)
6296         for node, disk in device.ComputeNodeTree(instance.primary_node):
6297           self.cfg.SetDiskID(disk, node)
6298           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6299           if msg:
6300             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6301                             " continuing anyway", device_idx, node, msg)
6302         result.append(("disk/%d" % device_idx, "remove"))
6303       elif disk_op == constants.DDM_ADD:
6304         # add a new disk
6305         if instance.disk_template == constants.DT_FILE:
6306           file_driver, file_path = instance.disks[0].logical_id
6307           file_path = os.path.dirname(file_path)
6308         else:
6309           file_driver = file_path = None
6310         disk_idx_base = len(instance.disks)
6311         new_disk = _GenerateDiskTemplate(self,
6312                                          instance.disk_template,
6313                                          instance.name, instance.primary_node,
6314                                          instance.secondary_nodes,
6315                                          [disk_dict],
6316                                          file_path,
6317                                          file_driver,
6318                                          disk_idx_base)[0]
6319         instance.disks.append(new_disk)
6320         info = _GetInstanceInfoText(instance)
6321
6322         logging.info("Creating volume %s for instance %s",
6323                      new_disk.iv_name, instance.name)
6324         # Note: this needs to be kept in sync with _CreateDisks
6325         #HARDCODE
6326         for node in instance.all_nodes:
6327           f_create = node == instance.primary_node
6328           try:
6329             _CreateBlockDev(self, node, instance, new_disk,
6330                             f_create, info, f_create)
6331           except errors.OpExecError, err:
6332             self.LogWarning("Failed to create volume %s (%s) on"
6333                             " node %s: %s",
6334                             new_disk.iv_name, new_disk, node, err)
6335         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6336                        (new_disk.size, new_disk.mode)))
6337       else:
6338         # change a given disk
6339         instance.disks[disk_op].mode = disk_dict['mode']
6340         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6341     # NIC changes
6342     for nic_op, nic_dict in self.op.nics:
6343       if nic_op == constants.DDM_REMOVE:
6344         # remove the last nic
6345         del instance.nics[-1]
6346         result.append(("nic.%d" % len(instance.nics), "remove"))
6347       elif nic_op == constants.DDM_ADD:
6348         # mac and bridge should be set, by now
6349         mac = nic_dict['mac']
6350         ip = nic_dict.get('ip', None)
6351         nicparams = self.nic_pinst[constants.DDM_ADD]
6352         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6353         instance.nics.append(new_nic)
6354         result.append(("nic.%d" % (len(instance.nics) - 1),
6355                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6356                        (new_nic.mac, new_nic.ip,
6357                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6358                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6359                        )))
6360       else:
6361         for key in 'mac', 'ip':
6362           if key in nic_dict:
6363             setattr(instance.nics[nic_op], key, nic_dict[key])
6364         if nic_op in self.nic_pnew:
6365           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6366         for key, val in nic_dict.iteritems():
6367           result.append(("nic.%s/%d" % (key, nic_op), val))
6368
6369     # hvparams changes
6370     if self.op.hvparams:
6371       instance.hvparams = self.hv_inst
6372       for key, val in self.op.hvparams.iteritems():
6373         result.append(("hv/%s" % key, val))
6374
6375     # beparams changes
6376     if self.op.beparams:
6377       instance.beparams = self.be_inst
6378       for key, val in self.op.beparams.iteritems():
6379         result.append(("be/%s" % key, val))
6380
6381     self.cfg.Update(instance)
6382
6383     return result
6384
6385
6386 class LUQueryExports(NoHooksLU):
6387   """Query the exports list
6388
6389   """
6390   _OP_REQP = ['nodes']
6391   REQ_BGL = False
6392
6393   def ExpandNames(self):
6394     self.needed_locks = {}
6395     self.share_locks[locking.LEVEL_NODE] = 1
6396     if not self.op.nodes:
6397       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6398     else:
6399       self.needed_locks[locking.LEVEL_NODE] = \
6400         _GetWantedNodes(self, self.op.nodes)
6401
6402   def CheckPrereq(self):
6403     """Check prerequisites.
6404
6405     """
6406     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6407
6408   def Exec(self, feedback_fn):
6409     """Compute the list of all the exported system images.
6410
6411     @rtype: dict
6412     @return: a dictionary with the structure node->(export-list)
6413         where export-list is a list of the instances exported on
6414         that node.
6415
6416     """
6417     rpcresult = self.rpc.call_export_list(self.nodes)
6418     result = {}
6419     for node in rpcresult:
6420       if rpcresult[node].RemoteFailMsg():
6421         result[node] = False
6422       else:
6423         result[node] = rpcresult[node].payload
6424
6425     return result
6426
6427
6428 class LUExportInstance(LogicalUnit):
6429   """Export an instance to an image in the cluster.
6430
6431   """
6432   HPATH = "instance-export"
6433   HTYPE = constants.HTYPE_INSTANCE
6434   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6435   REQ_BGL = False
6436
6437   def ExpandNames(self):
6438     self._ExpandAndLockInstance()
6439     # FIXME: lock only instance primary and destination node
6440     #
6441     # Sad but true, for now we have do lock all nodes, as we don't know where
6442     # the previous export might be, and and in this LU we search for it and
6443     # remove it from its current node. In the future we could fix this by:
6444     #  - making a tasklet to search (share-lock all), then create the new one,
6445     #    then one to remove, after
6446     #  - removing the removal operation altoghether
6447     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6448
6449   def DeclareLocks(self, level):
6450     """Last minute lock declaration."""
6451     # All nodes are locked anyway, so nothing to do here.
6452
6453   def BuildHooksEnv(self):
6454     """Build hooks env.
6455
6456     This will run on the master, primary node and target node.
6457
6458     """
6459     env = {
6460       "EXPORT_NODE": self.op.target_node,
6461       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6462       }
6463     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6464     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6465           self.op.target_node]
6466     return env, nl, nl
6467
6468   def CheckPrereq(self):
6469     """Check prerequisites.
6470
6471     This checks that the instance and node names are valid.
6472
6473     """
6474     instance_name = self.op.instance_name
6475     self.instance = self.cfg.GetInstanceInfo(instance_name)
6476     assert self.instance is not None, \
6477           "Cannot retrieve locked instance %s" % self.op.instance_name
6478     _CheckNodeOnline(self, self.instance.primary_node)
6479
6480     self.dst_node = self.cfg.GetNodeInfo(
6481       self.cfg.ExpandNodeName(self.op.target_node))
6482
6483     if self.dst_node is None:
6484       # This is wrong node name, not a non-locked node
6485       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6486     _CheckNodeOnline(self, self.dst_node.name)
6487     _CheckNodeNotDrained(self, self.dst_node.name)
6488
6489     # instance disk type verification
6490     for disk in self.instance.disks:
6491       if disk.dev_type == constants.LD_FILE:
6492         raise errors.OpPrereqError("Export not supported for instances with"
6493                                    " file-based disks")
6494
6495   def Exec(self, feedback_fn):
6496     """Export an instance to an image in the cluster.
6497
6498     """
6499     instance = self.instance
6500     dst_node = self.dst_node
6501     src_node = instance.primary_node
6502     if self.op.shutdown:
6503       # shutdown the instance, but not the disks
6504       result = self.rpc.call_instance_shutdown(src_node, instance)
6505       msg = result.RemoteFailMsg()
6506       if msg:
6507         raise errors.OpExecError("Could not shutdown instance %s on"
6508                                  " node %s: %s" %
6509                                  (instance.name, src_node, msg))
6510
6511     vgname = self.cfg.GetVGName()
6512
6513     snap_disks = []
6514
6515     # set the disks ID correctly since call_instance_start needs the
6516     # correct drbd minor to create the symlinks
6517     for disk in instance.disks:
6518       self.cfg.SetDiskID(disk, src_node)
6519
6520     try:
6521       for disk in instance.disks:
6522         # result.payload will be a snapshot of an lvm leaf of the one we passed
6523         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6524         msg = result.RemoteFailMsg()
6525         if msg:
6526           self.LogWarning("Could not snapshot block device %s on node %s: %s",
6527                           disk.logical_id[1], src_node, msg)
6528           snap_disks.append(False)
6529         else:
6530           disk_id = (vgname, result.payload)
6531           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6532                                  logical_id=disk_id, physical_id=disk_id,
6533                                  iv_name=disk.iv_name)
6534           snap_disks.append(new_dev)
6535
6536     finally:
6537       if self.op.shutdown and instance.admin_up:
6538         result = self.rpc.call_instance_start(src_node, instance, None, None)
6539         msg = result.RemoteFailMsg()
6540         if msg:
6541           _ShutdownInstanceDisks(self, instance)
6542           raise errors.OpExecError("Could not start instance: %s" % msg)
6543
6544     # TODO: check for size
6545
6546     cluster_name = self.cfg.GetClusterName()
6547     for idx, dev in enumerate(snap_disks):
6548       if dev:
6549         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6550                                                instance, cluster_name, idx)
6551         msg = result.RemoteFailMsg()
6552         if msg:
6553           self.LogWarning("Could not export block device %s from node %s to"
6554                           " node %s: %s", dev.logical_id[1], src_node,
6555                           dst_node.name, msg)
6556         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6557         if msg:
6558           self.LogWarning("Could not remove snapshot block device %s from node"
6559                           " %s: %s", dev.logical_id[1], src_node, msg)
6560
6561     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6562     msg = result.RemoteFailMsg()
6563     if msg:
6564       self.LogWarning("Could not finalize export for instance %s"
6565                       " on node %s: %s", instance.name, dst_node.name, msg)
6566
6567     nodelist = self.cfg.GetNodeList()
6568     nodelist.remove(dst_node.name)
6569
6570     # on one-node clusters nodelist will be empty after the removal
6571     # if we proceed the backup would be removed because OpQueryExports
6572     # substitutes an empty list with the full cluster node list.
6573     iname = instance.name
6574     if nodelist:
6575       exportlist = self.rpc.call_export_list(nodelist)
6576       for node in exportlist:
6577         if exportlist[node].RemoteFailMsg():
6578           continue
6579         if iname in exportlist[node].payload:
6580           msg = self.rpc.call_export_remove(node, iname).RemoteFailMsg()
6581           if msg:
6582             self.LogWarning("Could not remove older export for instance %s"
6583                             " on node %s: %s", iname, node, msg)
6584
6585
6586 class LURemoveExport(NoHooksLU):
6587   """Remove exports related to the named instance.
6588
6589   """
6590   _OP_REQP = ["instance_name"]
6591   REQ_BGL = False
6592
6593   def ExpandNames(self):
6594     self.needed_locks = {}
6595     # We need all nodes to be locked in order for RemoveExport to work, but we
6596     # don't need to lock the instance itself, as nothing will happen to it (and
6597     # we can remove exports also for a removed instance)
6598     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6599
6600   def CheckPrereq(self):
6601     """Check prerequisites.
6602     """
6603     pass
6604
6605   def Exec(self, feedback_fn):
6606     """Remove any export.
6607
6608     """
6609     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6610     # If the instance was not found we'll try with the name that was passed in.
6611     # This will only work if it was an FQDN, though.
6612     fqdn_warn = False
6613     if not instance_name:
6614       fqdn_warn = True
6615       instance_name = self.op.instance_name
6616
6617     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6618     exportlist = self.rpc.call_export_list(locked_nodes)
6619     found = False
6620     for node in exportlist:
6621       msg = exportlist[node].RemoteFailMsg()
6622       if msg:
6623         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6624         continue
6625       if instance_name in exportlist[node].payload:
6626         found = True
6627         result = self.rpc.call_export_remove(node, instance_name)
6628         msg = result.RemoteFailMsg()
6629         if msg:
6630           logging.error("Could not remove export for instance %s"
6631                         " on node %s: %s", instance_name, node, msg)
6632
6633     if fqdn_warn and not found:
6634       feedback_fn("Export not found. If trying to remove an export belonging"
6635                   " to a deleted instance please use its Fully Qualified"
6636                   " Domain Name.")
6637
6638
6639 class TagsLU(NoHooksLU):
6640   """Generic tags LU.
6641
6642   This is an abstract class which is the parent of all the other tags LUs.
6643
6644   """
6645
6646   def ExpandNames(self):
6647     self.needed_locks = {}
6648     if self.op.kind == constants.TAG_NODE:
6649       name = self.cfg.ExpandNodeName(self.op.name)
6650       if name is None:
6651         raise errors.OpPrereqError("Invalid node name (%s)" %
6652                                    (self.op.name,))
6653       self.op.name = name
6654       self.needed_locks[locking.LEVEL_NODE] = name
6655     elif self.op.kind == constants.TAG_INSTANCE:
6656       name = self.cfg.ExpandInstanceName(self.op.name)
6657       if name is None:
6658         raise errors.OpPrereqError("Invalid instance name (%s)" %
6659                                    (self.op.name,))
6660       self.op.name = name
6661       self.needed_locks[locking.LEVEL_INSTANCE] = name
6662
6663   def CheckPrereq(self):
6664     """Check prerequisites.
6665
6666     """
6667     if self.op.kind == constants.TAG_CLUSTER:
6668       self.target = self.cfg.GetClusterInfo()
6669     elif self.op.kind == constants.TAG_NODE:
6670       self.target = self.cfg.GetNodeInfo(self.op.name)
6671     elif self.op.kind == constants.TAG_INSTANCE:
6672       self.target = self.cfg.GetInstanceInfo(self.op.name)
6673     else:
6674       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6675                                  str(self.op.kind))
6676
6677
6678 class LUGetTags(TagsLU):
6679   """Returns the tags of a given object.
6680
6681   """
6682   _OP_REQP = ["kind", "name"]
6683   REQ_BGL = False
6684
6685   def Exec(self, feedback_fn):
6686     """Returns the tag list.
6687
6688     """
6689     return list(self.target.GetTags())
6690
6691
6692 class LUSearchTags(NoHooksLU):
6693   """Searches the tags for a given pattern.
6694
6695   """
6696   _OP_REQP = ["pattern"]
6697   REQ_BGL = False
6698
6699   def ExpandNames(self):
6700     self.needed_locks = {}
6701
6702   def CheckPrereq(self):
6703     """Check prerequisites.
6704
6705     This checks the pattern passed for validity by compiling it.
6706
6707     """
6708     try:
6709       self.re = re.compile(self.op.pattern)
6710     except re.error, err:
6711       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6712                                  (self.op.pattern, err))
6713
6714   def Exec(self, feedback_fn):
6715     """Returns the tag list.
6716
6717     """
6718     cfg = self.cfg
6719     tgts = [("/cluster", cfg.GetClusterInfo())]
6720     ilist = cfg.GetAllInstancesInfo().values()
6721     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6722     nlist = cfg.GetAllNodesInfo().values()
6723     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6724     results = []
6725     for path, target in tgts:
6726       for tag in target.GetTags():
6727         if self.re.search(tag):
6728           results.append((path, tag))
6729     return results
6730
6731
6732 class LUAddTags(TagsLU):
6733   """Sets a tag on a given object.
6734
6735   """
6736   _OP_REQP = ["kind", "name", "tags"]
6737   REQ_BGL = False
6738
6739   def CheckPrereq(self):
6740     """Check prerequisites.
6741
6742     This checks the type and length of the tag name and value.
6743
6744     """
6745     TagsLU.CheckPrereq(self)
6746     for tag in self.op.tags:
6747       objects.TaggableObject.ValidateTag(tag)
6748
6749   def Exec(self, feedback_fn):
6750     """Sets the tag.
6751
6752     """
6753     try:
6754       for tag in self.op.tags:
6755         self.target.AddTag(tag)
6756     except errors.TagError, err:
6757       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6758     try:
6759       self.cfg.Update(self.target)
6760     except errors.ConfigurationError:
6761       raise errors.OpRetryError("There has been a modification to the"
6762                                 " config file and the operation has been"
6763                                 " aborted. Please retry.")
6764
6765
6766 class LUDelTags(TagsLU):
6767   """Delete a list of tags from a given object.
6768
6769   """
6770   _OP_REQP = ["kind", "name", "tags"]
6771   REQ_BGL = False
6772
6773   def CheckPrereq(self):
6774     """Check prerequisites.
6775
6776     This checks that we have the given tag.
6777
6778     """
6779     TagsLU.CheckPrereq(self)
6780     for tag in self.op.tags:
6781       objects.TaggableObject.ValidateTag(tag)
6782     del_tags = frozenset(self.op.tags)
6783     cur_tags = self.target.GetTags()
6784     if not del_tags <= cur_tags:
6785       diff_tags = del_tags - cur_tags
6786       diff_names = ["'%s'" % tag for tag in diff_tags]
6787       diff_names.sort()
6788       raise errors.OpPrereqError("Tag(s) %s not found" %
6789                                  (",".join(diff_names)))
6790
6791   def Exec(self, feedback_fn):
6792     """Remove the tag from the object.
6793
6794     """
6795     for tag in self.op.tags:
6796       self.target.RemoveTag(tag)
6797     try:
6798       self.cfg.Update(self.target)
6799     except errors.ConfigurationError:
6800       raise errors.OpRetryError("There has been a modification to the"
6801                                 " config file and the operation has been"
6802                                 " aborted. Please retry.")
6803
6804
6805 class LUTestDelay(NoHooksLU):
6806   """Sleep for a specified amount of time.
6807
6808   This LU sleeps on the master and/or nodes for a specified amount of
6809   time.
6810
6811   """
6812   _OP_REQP = ["duration", "on_master", "on_nodes"]
6813   REQ_BGL = False
6814
6815   def ExpandNames(self):
6816     """Expand names and set required locks.
6817
6818     This expands the node list, if any.
6819
6820     """
6821     self.needed_locks = {}
6822     if self.op.on_nodes:
6823       # _GetWantedNodes can be used here, but is not always appropriate to use
6824       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6825       # more information.
6826       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6827       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6828
6829   def CheckPrereq(self):
6830     """Check prerequisites.
6831
6832     """
6833
6834   def Exec(self, feedback_fn):
6835     """Do the actual sleep.
6836
6837     """
6838     if self.op.on_master:
6839       if not utils.TestDelay(self.op.duration):
6840         raise errors.OpExecError("Error during master delay test")
6841     if self.op.on_nodes:
6842       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6843       if not result:
6844         raise errors.OpExecError("Complete failure from rpc call")
6845       for node, node_result in result.items():
6846         node_result.Raise()
6847         if not node_result.data:
6848           raise errors.OpExecError("Failure during rpc call to node %s,"
6849                                    " result: %s" % (node, node_result.data))
6850
6851
6852 class IAllocator(object):
6853   """IAllocator framework.
6854
6855   An IAllocator instance has three sets of attributes:
6856     - cfg that is needed to query the cluster
6857     - input data (all members of the _KEYS class attribute are required)
6858     - four buffer attributes (in|out_data|text), that represent the
6859       input (to the external script) in text and data structure format,
6860       and the output from it, again in two formats
6861     - the result variables from the script (success, info, nodes) for
6862       easy usage
6863
6864   """
6865   _ALLO_KEYS = [
6866     "mem_size", "disks", "disk_template",
6867     "os", "tags", "nics", "vcpus", "hypervisor",
6868     ]
6869   _RELO_KEYS = [
6870     "relocate_from",
6871     ]
6872
6873   def __init__(self, lu, mode, name, **kwargs):
6874     self.lu = lu
6875     # init buffer variables
6876     self.in_text = self.out_text = self.in_data = self.out_data = None
6877     # init all input fields so that pylint is happy
6878     self.mode = mode
6879     self.name = name
6880     self.mem_size = self.disks = self.disk_template = None
6881     self.os = self.tags = self.nics = self.vcpus = None
6882     self.hypervisor = None
6883     self.relocate_from = None
6884     # computed fields
6885     self.required_nodes = None
6886     # init result fields
6887     self.success = self.info = self.nodes = None
6888     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6889       keyset = self._ALLO_KEYS
6890     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6891       keyset = self._RELO_KEYS
6892     else:
6893       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6894                                    " IAllocator" % self.mode)
6895     for key in kwargs:
6896       if key not in keyset:
6897         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6898                                      " IAllocator" % key)
6899       setattr(self, key, kwargs[key])
6900     for key in keyset:
6901       if key not in kwargs:
6902         raise errors.ProgrammerError("Missing input parameter '%s' to"
6903                                      " IAllocator" % key)
6904     self._BuildInputData()
6905
6906   def _ComputeClusterData(self):
6907     """Compute the generic allocator input data.
6908
6909     This is the data that is independent of the actual operation.
6910
6911     """
6912     cfg = self.lu.cfg
6913     cluster_info = cfg.GetClusterInfo()
6914     # cluster data
6915     data = {
6916       "version": constants.IALLOCATOR_VERSION,
6917       "cluster_name": cfg.GetClusterName(),
6918       "cluster_tags": list(cluster_info.GetTags()),
6919       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6920       # we don't have job IDs
6921       }
6922     iinfo = cfg.GetAllInstancesInfo().values()
6923     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6924
6925     # node data
6926     node_results = {}
6927     node_list = cfg.GetNodeList()
6928
6929     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6930       hypervisor_name = self.hypervisor
6931     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6932       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6933
6934     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6935                                            hypervisor_name)
6936     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6937                        cluster_info.enabled_hypervisors)
6938     for nname, nresult in node_data.items():
6939       # first fill in static (config-based) values
6940       ninfo = cfg.GetNodeInfo(nname)
6941       pnr = {
6942         "tags": list(ninfo.GetTags()),
6943         "primary_ip": ninfo.primary_ip,
6944         "secondary_ip": ninfo.secondary_ip,
6945         "offline": ninfo.offline,
6946         "drained": ninfo.drained,
6947         "master_candidate": ninfo.master_candidate,
6948         }
6949
6950       if not ninfo.offline:
6951         msg = nresult.RemoteFailMsg()
6952         if msg:
6953           raise errors.OpExecError("Can't get data for node %s: %s" %
6954                                    (nname, msg))
6955         msg = node_iinfo[nname].RemoteFailMsg()
6956         if msg:
6957           raise errors.OpExecError("Can't get node instance info"
6958                                    " from node %s: %s" % (nname, msg))
6959         remote_info = nresult.payload
6960         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6961                      'vg_size', 'vg_free', 'cpu_total']:
6962           if attr not in remote_info:
6963             raise errors.OpExecError("Node '%s' didn't return attribute"
6964                                      " '%s'" % (nname, attr))
6965           if not isinstance(remote_info[attr], int):
6966             raise errors.OpExecError("Node '%s' returned invalid value"
6967                                      " for '%s': %s" %
6968                                      (nname, attr, remote_info[attr]))
6969         # compute memory used by primary instances
6970         i_p_mem = i_p_up_mem = 0
6971         for iinfo, beinfo in i_list:
6972           if iinfo.primary_node == nname:
6973             i_p_mem += beinfo[constants.BE_MEMORY]
6974             if iinfo.name not in node_iinfo[nname].payload:
6975               i_used_mem = 0
6976             else:
6977               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6978             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6979             remote_info['memory_free'] -= max(0, i_mem_diff)
6980
6981             if iinfo.admin_up:
6982               i_p_up_mem += beinfo[constants.BE_MEMORY]
6983
6984         # compute memory used by instances
6985         pnr_dyn = {
6986           "total_memory": remote_info['memory_total'],
6987           "reserved_memory": remote_info['memory_dom0'],
6988           "free_memory": remote_info['memory_free'],
6989           "total_disk": remote_info['vg_size'],
6990           "free_disk": remote_info['vg_free'],
6991           "total_cpus": remote_info['cpu_total'],
6992           "i_pri_memory": i_p_mem,
6993           "i_pri_up_memory": i_p_up_mem,
6994           }
6995         pnr.update(pnr_dyn)
6996
6997       node_results[nname] = pnr
6998     data["nodes"] = node_results
6999
7000     # instance data
7001     instance_data = {}
7002     for iinfo, beinfo in i_list:
7003       nic_data = []
7004       for nic in iinfo.nics:
7005         filled_params = objects.FillDict(
7006             cluster_info.nicparams[constants.PP_DEFAULT],
7007             nic.nicparams)
7008         nic_dict = {"mac": nic.mac,
7009                     "ip": nic.ip,
7010                     "mode": filled_params[constants.NIC_MODE],
7011                     "link": filled_params[constants.NIC_LINK],
7012                    }
7013         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7014           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7015         nic_data.append(nic_dict)
7016       pir = {
7017         "tags": list(iinfo.GetTags()),
7018         "admin_up": iinfo.admin_up,
7019         "vcpus": beinfo[constants.BE_VCPUS],
7020         "memory": beinfo[constants.BE_MEMORY],
7021         "os": iinfo.os,
7022         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7023         "nics": nic_data,
7024         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7025         "disk_template": iinfo.disk_template,
7026         "hypervisor": iinfo.hypervisor,
7027         }
7028       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7029                                                  pir["disks"])
7030       instance_data[iinfo.name] = pir
7031
7032     data["instances"] = instance_data
7033
7034     self.in_data = data
7035
7036   def _AddNewInstance(self):
7037     """Add new instance data to allocator structure.
7038
7039     This in combination with _AllocatorGetClusterData will create the
7040     correct structure needed as input for the allocator.
7041
7042     The checks for the completeness of the opcode must have already been
7043     done.
7044
7045     """
7046     data = self.in_data
7047
7048     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7049
7050     if self.disk_template in constants.DTS_NET_MIRROR:
7051       self.required_nodes = 2
7052     else:
7053       self.required_nodes = 1
7054     request = {
7055       "type": "allocate",
7056       "name": self.name,
7057       "disk_template": self.disk_template,
7058       "tags": self.tags,
7059       "os": self.os,
7060       "vcpus": self.vcpus,
7061       "memory": self.mem_size,
7062       "disks": self.disks,
7063       "disk_space_total": disk_space,
7064       "nics": self.nics,
7065       "required_nodes": self.required_nodes,
7066       }
7067     data["request"] = request
7068
7069   def _AddRelocateInstance(self):
7070     """Add relocate instance data to allocator structure.
7071
7072     This in combination with _IAllocatorGetClusterData will create the
7073     correct structure needed as input for the allocator.
7074
7075     The checks for the completeness of the opcode must have already been
7076     done.
7077
7078     """
7079     instance = self.lu.cfg.GetInstanceInfo(self.name)
7080     if instance is None:
7081       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7082                                    " IAllocator" % self.name)
7083
7084     if instance.disk_template not in constants.DTS_NET_MIRROR:
7085       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7086
7087     if len(instance.secondary_nodes) != 1:
7088       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7089
7090     self.required_nodes = 1
7091     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7092     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7093
7094     request = {
7095       "type": "relocate",
7096       "name": self.name,
7097       "disk_space_total": disk_space,
7098       "required_nodes": self.required_nodes,
7099       "relocate_from": self.relocate_from,
7100       }
7101     self.in_data["request"] = request
7102
7103   def _BuildInputData(self):
7104     """Build input data structures.
7105
7106     """
7107     self._ComputeClusterData()
7108
7109     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7110       self._AddNewInstance()
7111     else:
7112       self._AddRelocateInstance()
7113
7114     self.in_text = serializer.Dump(self.in_data)
7115
7116   def Run(self, name, validate=True, call_fn=None):
7117     """Run an instance allocator and return the results.
7118
7119     """
7120     if call_fn is None:
7121       call_fn = self.lu.rpc.call_iallocator_runner
7122     data = self.in_text
7123
7124     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7125     result.Raise()
7126
7127     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7128       raise errors.OpExecError("Invalid result from master iallocator runner")
7129
7130     rcode, stdout, stderr, fail = result.data
7131
7132     if rcode == constants.IARUN_NOTFOUND:
7133       raise errors.OpExecError("Can't find allocator '%s'" % name)
7134     elif rcode == constants.IARUN_FAILURE:
7135       raise errors.OpExecError("Instance allocator call failed: %s,"
7136                                " output: %s" % (fail, stdout+stderr))
7137     self.out_text = stdout
7138     if validate:
7139       self._ValidateResult()
7140
7141   def _ValidateResult(self):
7142     """Process the allocator results.
7143
7144     This will process and if successful save the result in
7145     self.out_data and the other parameters.
7146
7147     """
7148     try:
7149       rdict = serializer.Load(self.out_text)
7150     except Exception, err:
7151       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7152
7153     if not isinstance(rdict, dict):
7154       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7155
7156     for key in "success", "info", "nodes":
7157       if key not in rdict:
7158         raise errors.OpExecError("Can't parse iallocator results:"
7159                                  " missing key '%s'" % key)
7160       setattr(self, key, rdict[key])
7161
7162     if not isinstance(rdict["nodes"], list):
7163       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7164                                " is not a list")
7165     self.out_data = rdict
7166
7167
7168 class LUTestAllocator(NoHooksLU):
7169   """Run allocator tests.
7170
7171   This LU runs the allocator tests
7172
7173   """
7174   _OP_REQP = ["direction", "mode", "name"]
7175
7176   def CheckPrereq(self):
7177     """Check prerequisites.
7178
7179     This checks the opcode parameters depending on the director and mode test.
7180
7181     """
7182     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7183       for attr in ["name", "mem_size", "disks", "disk_template",
7184                    "os", "tags", "nics", "vcpus"]:
7185         if not hasattr(self.op, attr):
7186           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7187                                      attr)
7188       iname = self.cfg.ExpandInstanceName(self.op.name)
7189       if iname is not None:
7190         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7191                                    iname)
7192       if not isinstance(self.op.nics, list):
7193         raise errors.OpPrereqError("Invalid parameter 'nics'")
7194       for row in self.op.nics:
7195         if (not isinstance(row, dict) or
7196             "mac" not in row or
7197             "ip" not in row or
7198             "bridge" not in row):
7199           raise errors.OpPrereqError("Invalid contents of the"
7200                                      " 'nics' parameter")
7201       if not isinstance(self.op.disks, list):
7202         raise errors.OpPrereqError("Invalid parameter 'disks'")
7203       for row in self.op.disks:
7204         if (not isinstance(row, dict) or
7205             "size" not in row or
7206             not isinstance(row["size"], int) or
7207             "mode" not in row or
7208             row["mode"] not in ['r', 'w']):
7209           raise errors.OpPrereqError("Invalid contents of the"
7210                                      " 'disks' parameter")
7211       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7212         self.op.hypervisor = self.cfg.GetHypervisorType()
7213     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7214       if not hasattr(self.op, "name"):
7215         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7216       fname = self.cfg.ExpandInstanceName(self.op.name)
7217       if fname is None:
7218         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7219                                    self.op.name)
7220       self.op.name = fname
7221       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7222     else:
7223       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7224                                  self.op.mode)
7225
7226     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7227       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7228         raise errors.OpPrereqError("Missing allocator name")
7229     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7230       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7231                                  self.op.direction)
7232
7233   def Exec(self, feedback_fn):
7234     """Run the allocator test.
7235
7236     """
7237     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7238       ial = IAllocator(self,
7239                        mode=self.op.mode,
7240                        name=self.op.name,
7241                        mem_size=self.op.mem_size,
7242                        disks=self.op.disks,
7243                        disk_template=self.op.disk_template,
7244                        os=self.op.os,
7245                        tags=self.op.tags,
7246                        nics=self.op.nics,
7247                        vcpus=self.op.vcpus,
7248                        hypervisor=self.op.hypervisor,
7249                        )
7250     else:
7251       ial = IAllocator(self,
7252                        mode=self.op.mode,
7253                        name=self.op.name,
7254                        relocate_from=list(self.relocate_from),
7255                        )
7256
7257     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7258       result = ial.in_text
7259     else:
7260       ial.Run(self.op.allocator, validate=False)
7261       result = ial.out_text
7262     return result