Big rewrite of the OS-related functions
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     msg = result.RemoteFailMsg()
614     if msg:
615       raise errors.OpPrereqError("Error checking bridges on destination node"
616                                  " '%s': %s" % (target_node, msg))
617
618
619 def _CheckInstanceBridgesExist(lu, instance, node=None):
620   """Check that the brigdes needed by an instance exist.
621
622   """
623   if node is None:
624     node=instance.primary_node
625   _CheckNicsBridgesExist(lu, instance.nics, node)
626
627
628 class LUDestroyCluster(NoHooksLU):
629   """Logical unit for destroying the cluster.
630
631   """
632   _OP_REQP = []
633
634   def CheckPrereq(self):
635     """Check prerequisites.
636
637     This checks whether the cluster is empty.
638
639     Any errors are signalled by raising errors.OpPrereqError.
640
641     """
642     master = self.cfg.GetMasterNode()
643
644     nodelist = self.cfg.GetNodeList()
645     if len(nodelist) != 1 or nodelist[0] != master:
646       raise errors.OpPrereqError("There are still %d node(s) in"
647                                  " this cluster." % (len(nodelist) - 1))
648     instancelist = self.cfg.GetInstanceList()
649     if instancelist:
650       raise errors.OpPrereqError("There are still %d instance(s) in"
651                                  " this cluster." % len(instancelist))
652
653   def Exec(self, feedback_fn):
654     """Destroys the cluster.
655
656     """
657     master = self.cfg.GetMasterNode()
658     result = self.rpc.call_node_stop_master(master, False)
659     msg = result.RemoteFailMsg()
660     if msg:
661       raise errors.OpExecError("Could not disable the master role: %s" % msg)
662     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
663     utils.CreateBackup(priv_key)
664     utils.CreateBackup(pub_key)
665     return master
666
667
668 class LUVerifyCluster(LogicalUnit):
669   """Verifies the cluster status.
670
671   """
672   HPATH = "cluster-verify"
673   HTYPE = constants.HTYPE_CLUSTER
674   _OP_REQP = ["skip_checks"]
675   REQ_BGL = False
676
677   def ExpandNames(self):
678     self.needed_locks = {
679       locking.LEVEL_NODE: locking.ALL_SET,
680       locking.LEVEL_INSTANCE: locking.ALL_SET,
681     }
682     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
683
684   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
685                   node_result, feedback_fn, master_files,
686                   drbd_map, vg_name):
687     """Run multiple tests against a node.
688
689     Test list:
690
691       - compares ganeti version
692       - checks vg existance and size > 20G
693       - checks config file checksum
694       - checks ssh to other nodes
695
696     @type nodeinfo: L{objects.Node}
697     @param nodeinfo: the node to check
698     @param file_list: required list of files
699     @param local_cksum: dictionary of local files and their checksums
700     @param node_result: the results from the node
701     @param feedback_fn: function used to accumulate results
702     @param master_files: list of files that only masters should have
703     @param drbd_map: the useddrbd minors for this node, in
704         form of minor: (instance, must_exist) which correspond to instances
705         and their running status
706     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
707
708     """
709     node = nodeinfo.name
710
711     # main result, node_result should be a non-empty dict
712     if not node_result or not isinstance(node_result, dict):
713       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
714       return True
715
716     # compares ganeti version
717     local_version = constants.PROTOCOL_VERSION
718     remote_version = node_result.get('version', None)
719     if not (remote_version and isinstance(remote_version, (list, tuple)) and
720             len(remote_version) == 2):
721       feedback_fn("  - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version[0]:
725       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
726                   " node %s %s" % (local_version, node, remote_version[0]))
727       return True
728
729     # node seems compatible, we can actually try to look into its results
730
731     bad = False
732
733     # full package version
734     if constants.RELEASE_VERSION != remote_version[1]:
735       feedback_fn("  - WARNING: software version mismatch: master %s,"
736                   " node %s %s" %
737                   (constants.RELEASE_VERSION, node, remote_version[1]))
738
739     # checks vg existence and size > 20G
740     if vg_name is not None:
741       vglist = node_result.get(constants.NV_VGLIST, None)
742       if not vglist:
743         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
744                         (node,))
745         bad = True
746       else:
747         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
748                                               constants.MIN_VG_SIZE)
749         if vgstatus:
750           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
751           bad = True
752
753     # checks config file checksum
754
755     remote_cksum = node_result.get(constants.NV_FILELIST, None)
756     if not isinstance(remote_cksum, dict):
757       bad = True
758       feedback_fn("  - ERROR: node hasn't returned file checksum data")
759     else:
760       for file_name in file_list:
761         node_is_mc = nodeinfo.master_candidate
762         must_have_file = file_name not in master_files
763         if file_name not in remote_cksum:
764           if node_is_mc or must_have_file:
765             bad = True
766             feedback_fn("  - ERROR: file '%s' missing" % file_name)
767         elif remote_cksum[file_name] != local_cksum[file_name]:
768           if node_is_mc or must_have_file:
769             bad = True
770             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
771           else:
772             # not candidate and this is not a must-have file
773             bad = True
774             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
775                         " '%s'" % file_name)
776         else:
777           # all good, except non-master/non-must have combination
778           if not node_is_mc and not must_have_file:
779             feedback_fn("  - ERROR: file '%s' should not exist on non master"
780                         " candidates" % file_name)
781
782     # checks ssh to any
783
784     if constants.NV_NODELIST not in node_result:
785       bad = True
786       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
787     else:
788       if node_result[constants.NV_NODELIST]:
789         bad = True
790         for node in node_result[constants.NV_NODELIST]:
791           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
792                           (node, node_result[constants.NV_NODELIST][node]))
793
794     if constants.NV_NODENETTEST not in node_result:
795       bad = True
796       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
797     else:
798       if node_result[constants.NV_NODENETTEST]:
799         bad = True
800         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
801         for node in nlist:
802           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
803                           (node, node_result[constants.NV_NODENETTEST][node]))
804
805     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
806     if isinstance(hyp_result, dict):
807       for hv_name, hv_result in hyp_result.iteritems():
808         if hv_result is not None:
809           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
810                       (hv_name, hv_result))
811
812     # check used drbd list
813     if vg_name is not None:
814       used_minors = node_result.get(constants.NV_DRBDLIST, [])
815       if not isinstance(used_minors, (tuple, list)):
816         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
817                     str(used_minors))
818       else:
819         for minor, (iname, must_exist) in drbd_map.items():
820           if minor not in used_minors and must_exist:
821             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
822                         " not active" % (minor, iname))
823             bad = True
824         for minor in used_minors:
825           if minor not in drbd_map:
826             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
827                         minor)
828             bad = True
829
830     return bad
831
832   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
833                       node_instance, feedback_fn, n_offline):
834     """Verify an instance.
835
836     This function checks to see if the required block devices are
837     available on the instance's node.
838
839     """
840     bad = False
841
842     node_current = instanceconfig.primary_node
843
844     node_vol_should = {}
845     instanceconfig.MapLVsByNode(node_vol_should)
846
847     for node in node_vol_should:
848       if node in n_offline:
849         # ignore missing volumes on offline nodes
850         continue
851       for volume in node_vol_should[node]:
852         if node not in node_vol_is or volume not in node_vol_is[node]:
853           feedback_fn("  - ERROR: volume %s missing on node %s" %
854                           (volume, node))
855           bad = True
856
857     if instanceconfig.admin_up:
858       if ((node_current not in node_instance or
859           not instance in node_instance[node_current]) and
860           node_current not in n_offline):
861         feedback_fn("  - ERROR: instance %s not running on node %s" %
862                         (instance, node_current))
863         bad = True
864
865     for node in node_instance:
866       if (not node == node_current):
867         if instance in node_instance[node]:
868           feedback_fn("  - ERROR: instance %s should not run on node %s" %
869                           (instance, node))
870           bad = True
871
872     return bad
873
874   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
875     """Verify if there are any unknown volumes in the cluster.
876
877     The .os, .swap and backup volumes are ignored. All other volumes are
878     reported as unknown.
879
880     """
881     bad = False
882
883     for node in node_vol_is:
884       for volume in node_vol_is[node]:
885         if node not in node_vol_should or volume not in node_vol_should[node]:
886           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
887                       (volume, node))
888           bad = True
889     return bad
890
891   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
892     """Verify the list of running instances.
893
894     This checks what instances are running but unknown to the cluster.
895
896     """
897     bad = False
898     for node in node_instance:
899       for runninginstance in node_instance[node]:
900         if runninginstance not in instancelist:
901           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
902                           (runninginstance, node))
903           bad = True
904     return bad
905
906   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
907     """Verify N+1 Memory Resilience.
908
909     Check that if one single node dies we can still start all the instances it
910     was primary for.
911
912     """
913     bad = False
914
915     for node, nodeinfo in node_info.iteritems():
916       # This code checks that every node which is now listed as secondary has
917       # enough memory to host all instances it is supposed to should a single
918       # other node in the cluster fail.
919       # FIXME: not ready for failover to an arbitrary node
920       # FIXME: does not support file-backed instances
921       # WARNING: we currently take into account down instances as well as up
922       # ones, considering that even if they're down someone might want to start
923       # them even in the event of a node failure.
924       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
925         needed_mem = 0
926         for instance in instances:
927           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
928           if bep[constants.BE_AUTO_BALANCE]:
929             needed_mem += bep[constants.BE_MEMORY]
930         if nodeinfo['mfree'] < needed_mem:
931           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
932                       " failovers should node %s fail" % (node, prinode))
933           bad = True
934     return bad
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     Transform the list of checks we're going to skip into a set and check that
940     all its members are valid.
941
942     """
943     self.skip_set = frozenset(self.op.skip_checks)
944     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
945       raise errors.OpPrereqError("Invalid checks to be skipped specified")
946
947   def BuildHooksEnv(self):
948     """Build hooks env.
949
950     Cluster-Verify hooks just rone in the post phase and their failure makes
951     the output be logged in the verify output and the verification to fail.
952
953     """
954     all_nodes = self.cfg.GetNodeList()
955     env = {
956       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
957       }
958     for node in self.cfg.GetAllNodesInfo().values():
959       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
960
961     return env, [], all_nodes
962
963   def Exec(self, feedback_fn):
964     """Verify integrity of cluster, performing various test on nodes.
965
966     """
967     bad = False
968     feedback_fn("* Verifying global settings")
969     for msg in self.cfg.VerifyConfig():
970       feedback_fn("  - ERROR: %s" % msg)
971
972     vg_name = self.cfg.GetVGName()
973     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
974     nodelist = utils.NiceSort(self.cfg.GetNodeList())
975     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
976     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
977     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
978                         for iname in instancelist)
979     i_non_redundant = [] # Non redundant instances
980     i_non_a_balanced = [] # Non auto-balanced instances
981     n_offline = [] # List of offline nodes
982     n_drained = [] # List of nodes being drained
983     node_volume = {}
984     node_instance = {}
985     node_info = {}
986     instance_cfg = {}
987
988     # FIXME: verify OS list
989     # do local checksums
990     master_files = [constants.CLUSTER_CONF_FILE]
991
992     file_names = ssconf.SimpleStore().GetFileList()
993     file_names.append(constants.SSL_CERT_FILE)
994     file_names.append(constants.RAPI_CERT_FILE)
995     file_names.extend(master_files)
996
997     local_checksums = utils.FingerprintFiles(file_names)
998
999     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1000     node_verify_param = {
1001       constants.NV_FILELIST: file_names,
1002       constants.NV_NODELIST: [node.name for node in nodeinfo
1003                               if not node.offline],
1004       constants.NV_HYPERVISOR: hypervisors,
1005       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1006                                   node.secondary_ip) for node in nodeinfo
1007                                  if not node.offline],
1008       constants.NV_INSTANCELIST: hypervisors,
1009       constants.NV_VERSION: None,
1010       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1011       }
1012     if vg_name is not None:
1013       node_verify_param[constants.NV_VGLIST] = None
1014       node_verify_param[constants.NV_LVLIST] = vg_name
1015       node_verify_param[constants.NV_DRBDLIST] = None
1016     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1017                                            self.cfg.GetClusterName())
1018
1019     cluster = self.cfg.GetClusterInfo()
1020     master_node = self.cfg.GetMasterNode()
1021     all_drbd_map = self.cfg.ComputeDRBDMap()
1022
1023     for node_i in nodeinfo:
1024       node = node_i.name
1025
1026       if node_i.offline:
1027         feedback_fn("* Skipping offline node %s" % (node,))
1028         n_offline.append(node)
1029         continue
1030
1031       if node == master_node:
1032         ntype = "master"
1033       elif node_i.master_candidate:
1034         ntype = "master candidate"
1035       elif node_i.drained:
1036         ntype = "drained"
1037         n_drained.append(node)
1038       else:
1039         ntype = "regular"
1040       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1041
1042       msg = all_nvinfo[node].RemoteFailMsg()
1043       if msg:
1044         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1045         bad = True
1046         continue
1047
1048       nresult = all_nvinfo[node].payload
1049       node_drbd = {}
1050       for minor, instance in all_drbd_map[node].items():
1051         if instance not in instanceinfo:
1052           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053                       instance)
1054           # ghost instance should not be running, but otherwise we
1055           # don't give double warnings (both ghost instance and
1056           # unallocated minor in use)
1057           node_drbd[minor] = (instance, False)
1058         else:
1059           instance = instanceinfo[instance]
1060           node_drbd[minor] = (instance.name, instance.admin_up)
1061       result = self._VerifyNode(node_i, file_names, local_checksums,
1062                                 nresult, feedback_fn, master_files,
1063                                 node_drbd, vg_name)
1064       bad = bad or result
1065
1066       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067       if vg_name is None:
1068         node_volume[node] = {}
1069       elif isinstance(lvdata, basestring):
1070         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071                     (node, utils.SafeEncode(lvdata)))
1072         bad = True
1073         node_volume[node] = {}
1074       elif not isinstance(lvdata, dict):
1075         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076         bad = True
1077         continue
1078       else:
1079         node_volume[node] = lvdata
1080
1081       # node_instance
1082       idata = nresult.get(constants.NV_INSTANCELIST, None)
1083       if not isinstance(idata, list):
1084         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085                     (node,))
1086         bad = True
1087         continue
1088
1089       node_instance[node] = idata
1090
1091       # node_info
1092       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093       if not isinstance(nodeinfo, dict):
1094         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095         bad = True
1096         continue
1097
1098       try:
1099         node_info[node] = {
1100           "mfree": int(nodeinfo['memory_free']),
1101           "pinst": [],
1102           "sinst": [],
1103           # dictionary holding all instances this node is secondary for,
1104           # grouped by their primary node. Each key is a cluster node, and each
1105           # value is a list of instances which have the key as primary and the
1106           # current node as secondary.  this is handy to calculate N+1 memory
1107           # availability if you can only failover from a primary to its
1108           # secondary.
1109           "sinst-by-pnode": {},
1110         }
1111         # FIXME: devise a free space model for file based instances as well
1112         if vg_name is not None:
1113           if (constants.NV_VGLIST not in nresult or
1114               vg_name not in nresult[constants.NV_VGLIST]):
1115             feedback_fn("  - ERROR: node %s didn't return data for the"
1116                         " volume group '%s' - it is either missing or broken" %
1117                         (node, vg_name))
1118             bad = True
1119             continue
1120           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121       except (ValueError, KeyError):
1122         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123                     " from node %s" % (node,))
1124         bad = True
1125         continue
1126
1127     node_vol_should = {}
1128
1129     for instance in instancelist:
1130       feedback_fn("* Verifying instance %s" % instance)
1131       inst_config = instanceinfo[instance]
1132       result =  self._VerifyInstance(instance, inst_config, node_volume,
1133                                      node_instance, feedback_fn, n_offline)
1134       bad = bad or result
1135       inst_nodes_offline = []
1136
1137       inst_config.MapLVsByNode(node_vol_should)
1138
1139       instance_cfg[instance] = inst_config
1140
1141       pnode = inst_config.primary_node
1142       if pnode in node_info:
1143         node_info[pnode]['pinst'].append(instance)
1144       elif pnode not in n_offline:
1145         feedback_fn("  - ERROR: instance %s, connection to primary node"
1146                     " %s failed" % (instance, pnode))
1147         bad = True
1148
1149       if pnode in n_offline:
1150         inst_nodes_offline.append(pnode)
1151
1152       # If the instance is non-redundant we cannot survive losing its primary
1153       # node, so we are not N+1 compliant. On the other hand we have no disk
1154       # templates with more than one secondary so that situation is not well
1155       # supported either.
1156       # FIXME: does not support file-backed instances
1157       if len(inst_config.secondary_nodes) == 0:
1158         i_non_redundant.append(instance)
1159       elif len(inst_config.secondary_nodes) > 1:
1160         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161                     % instance)
1162
1163       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164         i_non_a_balanced.append(instance)
1165
1166       for snode in inst_config.secondary_nodes:
1167         if snode in node_info:
1168           node_info[snode]['sinst'].append(instance)
1169           if pnode not in node_info[snode]['sinst-by-pnode']:
1170             node_info[snode]['sinst-by-pnode'][pnode] = []
1171           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172         elif snode not in n_offline:
1173           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174                       " %s failed" % (instance, snode))
1175           bad = True
1176         if snode in n_offline:
1177           inst_nodes_offline.append(snode)
1178
1179       if inst_nodes_offline:
1180         # warn that the instance lives on offline nodes, and set bad=True
1181         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182                     ", ".join(inst_nodes_offline))
1183         bad = True
1184
1185     feedback_fn("* Verifying orphan volumes")
1186     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187                                        feedback_fn)
1188     bad = bad or result
1189
1190     feedback_fn("* Verifying remaining instances")
1191     result = self._VerifyOrphanInstances(instancelist, node_instance,
1192                                          feedback_fn)
1193     bad = bad or result
1194
1195     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196       feedback_fn("* Verifying N+1 Memory redundancy")
1197       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198       bad = bad or result
1199
1200     feedback_fn("* Other Notes")
1201     if i_non_redundant:
1202       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203                   % len(i_non_redundant))
1204
1205     if i_non_a_balanced:
1206       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207                   % len(i_non_a_balanced))
1208
1209     if n_offline:
1210       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211
1212     if n_drained:
1213       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214
1215     return not bad
1216
1217   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218     """Analize the post-hooks' result
1219
1220     This method analyses the hook result, handles it, and sends some
1221     nicely-formatted feedback back to the user.
1222
1223     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225     @param hooks_results: the results of the multi-node hooks rpc call
1226     @param feedback_fn: function used send feedback back to the caller
1227     @param lu_result: previous Exec result
1228     @return: the new Exec result, based on the previous result
1229         and hook results
1230
1231     """
1232     # We only really run POST phase hooks, and are only interested in
1233     # their results
1234     if phase == constants.HOOKS_PHASE_POST:
1235       # Used to change hooks' output to proper indentation
1236       indent_re = re.compile('^', re.M)
1237       feedback_fn("* Hooks Results")
1238       if not hooks_results:
1239         feedback_fn("  - ERROR: general communication failure")
1240         lu_result = 1
1241       else:
1242         for node_name in hooks_results:
1243           show_node_header = True
1244           res = hooks_results[node_name]
1245           msg = res.RemoteFailMsg()
1246           if msg:
1247             if res.offline:
1248               # no need to warn or set fail return value
1249               continue
1250             feedback_fn("    Communication failure in hooks execution: %s" %
1251                         msg)
1252             lu_result = 1
1253             continue
1254           for script, hkr, output in res.payload:
1255             if hkr == constants.HKR_FAIL:
1256               # The node header is only shown once, if there are
1257               # failing hooks on that node
1258               if show_node_header:
1259                 feedback_fn("  Node %s:" % node_name)
1260                 show_node_header = False
1261               feedback_fn("    ERROR: Script %s failed, output:" % script)
1262               output = indent_re.sub('      ', output)
1263               feedback_fn("%s" % output)
1264               lu_result = 1
1265
1266       return lu_result
1267
1268
1269 class LUVerifyDisks(NoHooksLU):
1270   """Verifies the cluster disks status.
1271
1272   """
1273   _OP_REQP = []
1274   REQ_BGL = False
1275
1276   def ExpandNames(self):
1277     self.needed_locks = {
1278       locking.LEVEL_NODE: locking.ALL_SET,
1279       locking.LEVEL_INSTANCE: locking.ALL_SET,
1280     }
1281     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1282
1283   def CheckPrereq(self):
1284     """Check prerequisites.
1285
1286     This has no prerequisites.
1287
1288     """
1289     pass
1290
1291   def Exec(self, feedback_fn):
1292     """Verify integrity of cluster disks.
1293
1294     @rtype: tuple of three items
1295     @return: a tuple of (dict of node-to-node_error, list of instances
1296         which need activate-disks, dict of instance: (node, volume) for
1297         missing volumes
1298
1299     """
1300     result = res_nodes, res_instances, res_missing = {}, [], {}
1301
1302     vg_name = self.cfg.GetVGName()
1303     nodes = utils.NiceSort(self.cfg.GetNodeList())
1304     instances = [self.cfg.GetInstanceInfo(name)
1305                  for name in self.cfg.GetInstanceList()]
1306
1307     nv_dict = {}
1308     for inst in instances:
1309       inst_lvs = {}
1310       if (not inst.admin_up or
1311           inst.disk_template not in constants.DTS_NET_MIRROR):
1312         continue
1313       inst.MapLVsByNode(inst_lvs)
1314       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1315       for node, vol_list in inst_lvs.iteritems():
1316         for vol in vol_list:
1317           nv_dict[(node, vol)] = inst
1318
1319     if not nv_dict:
1320       return result
1321
1322     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1323
1324     to_act = set()
1325     for node in nodes:
1326       # node_volume
1327       node_res = node_lvs[node]
1328       if node_res.offline:
1329         continue
1330       msg = node_res.RemoteFailMsg()
1331       if msg:
1332         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1333         res_nodes[node] = msg
1334         continue
1335
1336       lvs = node_res.payload
1337       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1338         inst = nv_dict.pop((node, lv_name), None)
1339         if (not lv_online and inst is not None
1340             and inst.name not in res_instances):
1341           res_instances.append(inst.name)
1342
1343     # any leftover items in nv_dict are missing LVs, let's arrange the
1344     # data better
1345     for key, inst in nv_dict.iteritems():
1346       if inst.name not in res_missing:
1347         res_missing[inst.name] = []
1348       res_missing[inst.name].append(key)
1349
1350     return result
1351
1352
1353 class LURenameCluster(LogicalUnit):
1354   """Rename the cluster.
1355
1356   """
1357   HPATH = "cluster-rename"
1358   HTYPE = constants.HTYPE_CLUSTER
1359   _OP_REQP = ["name"]
1360
1361   def BuildHooksEnv(self):
1362     """Build hooks env.
1363
1364     """
1365     env = {
1366       "OP_TARGET": self.cfg.GetClusterName(),
1367       "NEW_NAME": self.op.name,
1368       }
1369     mn = self.cfg.GetMasterNode()
1370     return env, [mn], [mn]
1371
1372   def CheckPrereq(self):
1373     """Verify that the passed name is a valid one.
1374
1375     """
1376     hostname = utils.HostInfo(self.op.name)
1377
1378     new_name = hostname.name
1379     self.ip = new_ip = hostname.ip
1380     old_name = self.cfg.GetClusterName()
1381     old_ip = self.cfg.GetMasterIP()
1382     if new_name == old_name and new_ip == old_ip:
1383       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1384                                  " cluster has changed")
1385     if new_ip != old_ip:
1386       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1387         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1388                                    " reachable on the network. Aborting." %
1389                                    new_ip)
1390
1391     self.op.name = new_name
1392
1393   def Exec(self, feedback_fn):
1394     """Rename the cluster.
1395
1396     """
1397     clustername = self.op.name
1398     ip = self.ip
1399
1400     # shutdown the master IP
1401     master = self.cfg.GetMasterNode()
1402     result = self.rpc.call_node_stop_master(master, False)
1403     msg = result.RemoteFailMsg()
1404     if msg:
1405       raise errors.OpExecError("Could not disable the master role: %s" % msg)
1406
1407     try:
1408       cluster = self.cfg.GetClusterInfo()
1409       cluster.cluster_name = clustername
1410       cluster.master_ip = ip
1411       self.cfg.Update(cluster)
1412
1413       # update the known hosts file
1414       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1415       node_list = self.cfg.GetNodeList()
1416       try:
1417         node_list.remove(master)
1418       except ValueError:
1419         pass
1420       result = self.rpc.call_upload_file(node_list,
1421                                          constants.SSH_KNOWN_HOSTS_FILE)
1422       for to_node, to_result in result.iteritems():
1423          msg = to_result.RemoteFailMsg()
1424          if msg:
1425            msg = ("Copy of file %s to node %s failed: %s" %
1426                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1427            self.proc.LogWarning(msg)
1428
1429     finally:
1430       result = self.rpc.call_node_start_master(master, False)
1431       msg = result.RemoteFailMsg()
1432       if msg:
1433         self.LogWarning("Could not re-enable the master role on"
1434                         " the master, please restart manually: %s", msg)
1435
1436
1437 def _RecursiveCheckIfLVMBased(disk):
1438   """Check if the given disk or its children are lvm-based.
1439
1440   @type disk: L{objects.Disk}
1441   @param disk: the disk to check
1442   @rtype: booleean
1443   @return: boolean indicating whether a LD_LV dev_type was found or not
1444
1445   """
1446   if disk.children:
1447     for chdisk in disk.children:
1448       if _RecursiveCheckIfLVMBased(chdisk):
1449         return True
1450   return disk.dev_type == constants.LD_LV
1451
1452
1453 class LUSetClusterParams(LogicalUnit):
1454   """Change the parameters of the cluster.
1455
1456   """
1457   HPATH = "cluster-modify"
1458   HTYPE = constants.HTYPE_CLUSTER
1459   _OP_REQP = []
1460   REQ_BGL = False
1461
1462   def CheckArguments(self):
1463     """Check parameters
1464
1465     """
1466     if not hasattr(self.op, "candidate_pool_size"):
1467       self.op.candidate_pool_size = None
1468     if self.op.candidate_pool_size is not None:
1469       try:
1470         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1471       except (ValueError, TypeError), err:
1472         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1473                                    str(err))
1474       if self.op.candidate_pool_size < 1:
1475         raise errors.OpPrereqError("At least one master candidate needed")
1476
1477   def ExpandNames(self):
1478     # FIXME: in the future maybe other cluster params won't require checking on
1479     # all nodes to be modified.
1480     self.needed_locks = {
1481       locking.LEVEL_NODE: locking.ALL_SET,
1482     }
1483     self.share_locks[locking.LEVEL_NODE] = 1
1484
1485   def BuildHooksEnv(self):
1486     """Build hooks env.
1487
1488     """
1489     env = {
1490       "OP_TARGET": self.cfg.GetClusterName(),
1491       "NEW_VG_NAME": self.op.vg_name,
1492       }
1493     mn = self.cfg.GetMasterNode()
1494     return env, [mn], [mn]
1495
1496   def CheckPrereq(self):
1497     """Check prerequisites.
1498
1499     This checks whether the given params don't conflict and
1500     if the given volume group is valid.
1501
1502     """
1503     if self.op.vg_name is not None and not self.op.vg_name:
1504       instances = self.cfg.GetAllInstancesInfo().values()
1505       for inst in instances:
1506         for disk in inst.disks:
1507           if _RecursiveCheckIfLVMBased(disk):
1508             raise errors.OpPrereqError("Cannot disable lvm storage while"
1509                                        " lvm-based instances exist")
1510
1511     node_list = self.acquired_locks[locking.LEVEL_NODE]
1512
1513     # if vg_name not None, checks given volume group on all nodes
1514     if self.op.vg_name:
1515       vglist = self.rpc.call_vg_list(node_list)
1516       for node in node_list:
1517         msg = vglist[node].RemoteFailMsg()
1518         if msg:
1519           # ignoring down node
1520           self.LogWarning("Error while gathering data on node %s"
1521                           " (ignoring node): %s", node, msg)
1522           continue
1523         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1524                                               self.op.vg_name,
1525                                               constants.MIN_VG_SIZE)
1526         if vgstatus:
1527           raise errors.OpPrereqError("Error on node '%s': %s" %
1528                                      (node, vgstatus))
1529
1530     self.cluster = cluster = self.cfg.GetClusterInfo()
1531     # validate params changes
1532     if self.op.beparams:
1533       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1534       self.new_beparams = objects.FillDict(
1535         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1536
1537     if self.op.nicparams:
1538       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1539       self.new_nicparams = objects.FillDict(
1540         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1541       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1542
1543     # hypervisor list/parameters
1544     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1545     if self.op.hvparams:
1546       if not isinstance(self.op.hvparams, dict):
1547         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1548       for hv_name, hv_dict in self.op.hvparams.items():
1549         if hv_name not in self.new_hvparams:
1550           self.new_hvparams[hv_name] = hv_dict
1551         else:
1552           self.new_hvparams[hv_name].update(hv_dict)
1553
1554     if self.op.enabled_hypervisors is not None:
1555       self.hv_list = self.op.enabled_hypervisors
1556     else:
1557       self.hv_list = cluster.enabled_hypervisors
1558
1559     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1560       # either the enabled list has changed, or the parameters have, validate
1561       for hv_name, hv_params in self.new_hvparams.items():
1562         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1563             (self.op.enabled_hypervisors and
1564              hv_name in self.op.enabled_hypervisors)):
1565           # either this is a new hypervisor, or its parameters have changed
1566           hv_class = hypervisor.GetHypervisor(hv_name)
1567           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1568           hv_class.CheckParameterSyntax(hv_params)
1569           _CheckHVParams(self, node_list, hv_name, hv_params)
1570
1571   def Exec(self, feedback_fn):
1572     """Change the parameters of the cluster.
1573
1574     """
1575     if self.op.vg_name is not None:
1576       new_volume = self.op.vg_name
1577       if not new_volume:
1578         new_volume = None
1579       if new_volume != self.cfg.GetVGName():
1580         self.cfg.SetVGName(new_volume)
1581       else:
1582         feedback_fn("Cluster LVM configuration already in desired"
1583                     " state, not changing")
1584     if self.op.hvparams:
1585       self.cluster.hvparams = self.new_hvparams
1586     if self.op.enabled_hypervisors is not None:
1587       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1588     if self.op.beparams:
1589       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1590     if self.op.nicparams:
1591       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1592
1593     if self.op.candidate_pool_size is not None:
1594       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1595
1596     self.cfg.Update(self.cluster)
1597
1598     # we want to update nodes after the cluster so that if any errors
1599     # happen, we have recorded and saved the cluster info
1600     if self.op.candidate_pool_size is not None:
1601       _AdjustCandidatePool(self)
1602
1603
1604 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1605   """Distribute additional files which are part of the cluster configuration.
1606
1607   ConfigWriter takes care of distributing the config and ssconf files, but
1608   there are more files which should be distributed to all nodes. This function
1609   makes sure those are copied.
1610
1611   @param lu: calling logical unit
1612   @param additional_nodes: list of nodes not in the config to distribute to
1613
1614   """
1615   # 1. Gather target nodes
1616   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1617   dist_nodes = lu.cfg.GetNodeList()
1618   if additional_nodes is not None:
1619     dist_nodes.extend(additional_nodes)
1620   if myself.name in dist_nodes:
1621     dist_nodes.remove(myself.name)
1622   # 2. Gather files to distribute
1623   dist_files = set([constants.ETC_HOSTS,
1624                     constants.SSH_KNOWN_HOSTS_FILE,
1625                     constants.RAPI_CERT_FILE,
1626                     constants.RAPI_USERS_FILE,
1627                    ])
1628
1629   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1630   for hv_name in enabled_hypervisors:
1631     hv_class = hypervisor.GetHypervisor(hv_name)
1632     dist_files.update(hv_class.GetAncillaryFiles())
1633
1634   # 3. Perform the files upload
1635   for fname in dist_files:
1636     if os.path.exists(fname):
1637       result = lu.rpc.call_upload_file(dist_nodes, fname)
1638       for to_node, to_result in result.items():
1639          msg = to_result.RemoteFailMsg()
1640          if msg:
1641            msg = ("Copy of file %s to node %s failed: %s" %
1642                    (fname, to_node, msg))
1643            lu.proc.LogWarning(msg)
1644
1645
1646 class LURedistributeConfig(NoHooksLU):
1647   """Force the redistribution of cluster configuration.
1648
1649   This is a very simple LU.
1650
1651   """
1652   _OP_REQP = []
1653   REQ_BGL = False
1654
1655   def ExpandNames(self):
1656     self.needed_locks = {
1657       locking.LEVEL_NODE: locking.ALL_SET,
1658     }
1659     self.share_locks[locking.LEVEL_NODE] = 1
1660
1661   def CheckPrereq(self):
1662     """Check prerequisites.
1663
1664     """
1665
1666   def Exec(self, feedback_fn):
1667     """Redistribute the configuration.
1668
1669     """
1670     self.cfg.Update(self.cfg.GetClusterInfo())
1671     _RedistributeAncillaryFiles(self)
1672
1673
1674 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1675   """Sleep and poll for an instance's disk to sync.
1676
1677   """
1678   if not instance.disks:
1679     return True
1680
1681   if not oneshot:
1682     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1683
1684   node = instance.primary_node
1685
1686   for dev in instance.disks:
1687     lu.cfg.SetDiskID(dev, node)
1688
1689   retries = 0
1690   while True:
1691     max_time = 0
1692     done = True
1693     cumul_degraded = False
1694     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1695     msg = rstats.RemoteFailMsg()
1696     if msg:
1697       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1698       retries += 1
1699       if retries >= 10:
1700         raise errors.RemoteError("Can't contact node %s for mirror data,"
1701                                  " aborting." % node)
1702       time.sleep(6)
1703       continue
1704     rstats = rstats.payload
1705     retries = 0
1706     for i, mstat in enumerate(rstats):
1707       if mstat is None:
1708         lu.LogWarning("Can't compute data for node %s/%s",
1709                            node, instance.disks[i].iv_name)
1710         continue
1711       # we ignore the ldisk parameter
1712       perc_done, est_time, is_degraded, _ = mstat
1713       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1714       if perc_done is not None:
1715         done = False
1716         if est_time is not None:
1717           rem_time = "%d estimated seconds remaining" % est_time
1718           max_time = est_time
1719         else:
1720           rem_time = "no time estimate"
1721         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1722                         (instance.disks[i].iv_name, perc_done, rem_time))
1723     if done or oneshot:
1724       break
1725
1726     time.sleep(min(60, max_time))
1727
1728   if done:
1729     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1730   return not cumul_degraded
1731
1732
1733 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1734   """Check that mirrors are not degraded.
1735
1736   The ldisk parameter, if True, will change the test from the
1737   is_degraded attribute (which represents overall non-ok status for
1738   the device(s)) to the ldisk (representing the local storage status).
1739
1740   """
1741   lu.cfg.SetDiskID(dev, node)
1742   if ldisk:
1743     idx = 6
1744   else:
1745     idx = 5
1746
1747   result = True
1748   if on_primary or dev.AssembleOnSecondary():
1749     rstats = lu.rpc.call_blockdev_find(node, dev)
1750     msg = rstats.RemoteFailMsg()
1751     if msg:
1752       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1753       result = False
1754     elif not rstats.payload:
1755       lu.LogWarning("Can't find disk on node %s", node)
1756       result = False
1757     else:
1758       result = result and (not rstats.payload[idx])
1759   if dev.children:
1760     for child in dev.children:
1761       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1762
1763   return result
1764
1765
1766 class LUDiagnoseOS(NoHooksLU):
1767   """Logical unit for OS diagnose/query.
1768
1769   """
1770   _OP_REQP = ["output_fields", "names"]
1771   REQ_BGL = False
1772   _FIELDS_STATIC = utils.FieldSet()
1773   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1774
1775   def ExpandNames(self):
1776     if self.op.names:
1777       raise errors.OpPrereqError("Selective OS query not supported")
1778
1779     _CheckOutputFields(static=self._FIELDS_STATIC,
1780                        dynamic=self._FIELDS_DYNAMIC,
1781                        selected=self.op.output_fields)
1782
1783     # Lock all nodes, in shared mode
1784     # Temporary removal of locks, should be reverted later
1785     # TODO: reintroduce locks when they are lighter-weight
1786     self.needed_locks = {}
1787     #self.share_locks[locking.LEVEL_NODE] = 1
1788     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1789
1790   def CheckPrereq(self):
1791     """Check prerequisites.
1792
1793     """
1794
1795   @staticmethod
1796   def _DiagnoseByOS(node_list, rlist):
1797     """Remaps a per-node return list into an a per-os per-node dictionary
1798
1799     @param node_list: a list with the names of all nodes
1800     @param rlist: a map with node names as keys and OS objects as values
1801
1802     @rtype: dict
1803     @return: a dictionary with osnames as keys and as value another map, with
1804         nodes as keys and tuples of (path, status, diagnose) as values, eg::
1805
1806           {"debian-etch": {"node1": [(/usr/lib/..., True, ""),
1807                                      (/srv/..., False, "invalid api")],
1808                            "node2": [(/srv/..., True, "")]}
1809           }
1810
1811     """
1812     all_os = {}
1813     # we build here the list of nodes that didn't fail the RPC (at RPC
1814     # level), so that nodes with a non-responding node daemon don't
1815     # make all OSes invalid
1816     good_nodes = [node_name for node_name in rlist
1817                   if not rlist[node_name].RemoteFailMsg()]
1818     for node_name, nr in rlist.items():
1819       if nr.RemoteFailMsg() or not nr.payload:
1820         continue
1821       for name, path, status, diagnose in nr.payload:
1822         if name not in all_os:
1823           # build a list of nodes for this os containing empty lists
1824           # for each node in node_list
1825           all_os[name] = {}
1826           for nname in good_nodes:
1827             all_os[name][nname] = []
1828         all_os[name][node_name].append((path, status, diagnose))
1829     return all_os
1830
1831   def Exec(self, feedback_fn):
1832     """Compute the list of OSes.
1833
1834     """
1835     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1836     node_data = self.rpc.call_os_diagnose(valid_nodes)
1837     pol = self._DiagnoseByOS(valid_nodes, node_data)
1838     output = []
1839     for os_name, os_data in pol.items():
1840       row = []
1841       for field in self.op.output_fields:
1842         if field == "name":
1843           val = os_name
1844         elif field == "valid":
1845           val = utils.all([osl and osl[0][1] for osl in os_data.values()])
1846         elif field == "node_status":
1847           # this is just a copy of the dict
1848           val = {}
1849           for node_name, nos_list in os_data.items():
1850             val[node_name] = nos_list
1851         else:
1852           raise errors.ParameterError(field)
1853         row.append(val)
1854       output.append(row)
1855
1856     return output
1857
1858
1859 class LURemoveNode(LogicalUnit):
1860   """Logical unit for removing a node.
1861
1862   """
1863   HPATH = "node-remove"
1864   HTYPE = constants.HTYPE_NODE
1865   _OP_REQP = ["node_name"]
1866
1867   def BuildHooksEnv(self):
1868     """Build hooks env.
1869
1870     This doesn't run on the target node in the pre phase as a failed
1871     node would then be impossible to remove.
1872
1873     """
1874     env = {
1875       "OP_TARGET": self.op.node_name,
1876       "NODE_NAME": self.op.node_name,
1877       }
1878     all_nodes = self.cfg.GetNodeList()
1879     all_nodes.remove(self.op.node_name)
1880     return env, all_nodes, all_nodes
1881
1882   def CheckPrereq(self):
1883     """Check prerequisites.
1884
1885     This checks:
1886      - the node exists in the configuration
1887      - it does not have primary or secondary instances
1888      - it's not the master
1889
1890     Any errors are signalled by raising errors.OpPrereqError.
1891
1892     """
1893     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1894     if node is None:
1895       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1896
1897     instance_list = self.cfg.GetInstanceList()
1898
1899     masternode = self.cfg.GetMasterNode()
1900     if node.name == masternode:
1901       raise errors.OpPrereqError("Node is the master node,"
1902                                  " you need to failover first.")
1903
1904     for instance_name in instance_list:
1905       instance = self.cfg.GetInstanceInfo(instance_name)
1906       if node.name in instance.all_nodes:
1907         raise errors.OpPrereqError("Instance %s is still running on the node,"
1908                                    " please remove first." % instance_name)
1909     self.op.node_name = node.name
1910     self.node = node
1911
1912   def Exec(self, feedback_fn):
1913     """Removes the node from the cluster.
1914
1915     """
1916     node = self.node
1917     logging.info("Stopping the node daemon and removing configs from node %s",
1918                  node.name)
1919
1920     self.context.RemoveNode(node.name)
1921
1922     result = self.rpc.call_node_leave_cluster(node.name)
1923     msg = result.RemoteFailMsg()
1924     if msg:
1925       self.LogWarning("Errors encountered on the remote node while leaving"
1926                       " the cluster: %s", msg)
1927
1928     # Promote nodes to master candidate as needed
1929     _AdjustCandidatePool(self)
1930
1931
1932 class LUQueryNodes(NoHooksLU):
1933   """Logical unit for querying nodes.
1934
1935   """
1936   _OP_REQP = ["output_fields", "names", "use_locking"]
1937   REQ_BGL = False
1938   _FIELDS_DYNAMIC = utils.FieldSet(
1939     "dtotal", "dfree",
1940     "mtotal", "mnode", "mfree",
1941     "bootid",
1942     "ctotal", "cnodes", "csockets",
1943     )
1944
1945   _FIELDS_STATIC = utils.FieldSet(
1946     "name", "pinst_cnt", "sinst_cnt",
1947     "pinst_list", "sinst_list",
1948     "pip", "sip", "tags",
1949     "serial_no",
1950     "master_candidate",
1951     "master",
1952     "offline",
1953     "drained",
1954     )
1955
1956   def ExpandNames(self):
1957     _CheckOutputFields(static=self._FIELDS_STATIC,
1958                        dynamic=self._FIELDS_DYNAMIC,
1959                        selected=self.op.output_fields)
1960
1961     self.needed_locks = {}
1962     self.share_locks[locking.LEVEL_NODE] = 1
1963
1964     if self.op.names:
1965       self.wanted = _GetWantedNodes(self, self.op.names)
1966     else:
1967       self.wanted = locking.ALL_SET
1968
1969     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1970     self.do_locking = self.do_node_query and self.op.use_locking
1971     if self.do_locking:
1972       # if we don't request only static fields, we need to lock the nodes
1973       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1974
1975
1976   def CheckPrereq(self):
1977     """Check prerequisites.
1978
1979     """
1980     # The validation of the node list is done in the _GetWantedNodes,
1981     # if non empty, and if empty, there's no validation to do
1982     pass
1983
1984   def Exec(self, feedback_fn):
1985     """Computes the list of nodes and their attributes.
1986
1987     """
1988     all_info = self.cfg.GetAllNodesInfo()
1989     if self.do_locking:
1990       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1991     elif self.wanted != locking.ALL_SET:
1992       nodenames = self.wanted
1993       missing = set(nodenames).difference(all_info.keys())
1994       if missing:
1995         raise errors.OpExecError(
1996           "Some nodes were removed before retrieving their data: %s" % missing)
1997     else:
1998       nodenames = all_info.keys()
1999
2000     nodenames = utils.NiceSort(nodenames)
2001     nodelist = [all_info[name] for name in nodenames]
2002
2003     # begin data gathering
2004
2005     if self.do_node_query:
2006       live_data = {}
2007       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2008                                           self.cfg.GetHypervisorType())
2009       for name in nodenames:
2010         nodeinfo = node_data[name]
2011         if not nodeinfo.RemoteFailMsg() and nodeinfo.payload:
2012           nodeinfo = nodeinfo.payload
2013           fn = utils.TryConvert
2014           live_data[name] = {
2015             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2016             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2017             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2018             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2019             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2020             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2021             "bootid": nodeinfo.get('bootid', None),
2022             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2023             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2024             }
2025         else:
2026           live_data[name] = {}
2027     else:
2028       live_data = dict.fromkeys(nodenames, {})
2029
2030     node_to_primary = dict([(name, set()) for name in nodenames])
2031     node_to_secondary = dict([(name, set()) for name in nodenames])
2032
2033     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2034                              "sinst_cnt", "sinst_list"))
2035     if inst_fields & frozenset(self.op.output_fields):
2036       instancelist = self.cfg.GetInstanceList()
2037
2038       for instance_name in instancelist:
2039         inst = self.cfg.GetInstanceInfo(instance_name)
2040         if inst.primary_node in node_to_primary:
2041           node_to_primary[inst.primary_node].add(inst.name)
2042         for secnode in inst.secondary_nodes:
2043           if secnode in node_to_secondary:
2044             node_to_secondary[secnode].add(inst.name)
2045
2046     master_node = self.cfg.GetMasterNode()
2047
2048     # end data gathering
2049
2050     output = []
2051     for node in nodelist:
2052       node_output = []
2053       for field in self.op.output_fields:
2054         if field == "name":
2055           val = node.name
2056         elif field == "pinst_list":
2057           val = list(node_to_primary[node.name])
2058         elif field == "sinst_list":
2059           val = list(node_to_secondary[node.name])
2060         elif field == "pinst_cnt":
2061           val = len(node_to_primary[node.name])
2062         elif field == "sinst_cnt":
2063           val = len(node_to_secondary[node.name])
2064         elif field == "pip":
2065           val = node.primary_ip
2066         elif field == "sip":
2067           val = node.secondary_ip
2068         elif field == "tags":
2069           val = list(node.GetTags())
2070         elif field == "serial_no":
2071           val = node.serial_no
2072         elif field == "master_candidate":
2073           val = node.master_candidate
2074         elif field == "master":
2075           val = node.name == master_node
2076         elif field == "offline":
2077           val = node.offline
2078         elif field == "drained":
2079           val = node.drained
2080         elif self._FIELDS_DYNAMIC.Matches(field):
2081           val = live_data[node.name].get(field, None)
2082         else:
2083           raise errors.ParameterError(field)
2084         node_output.append(val)
2085       output.append(node_output)
2086
2087     return output
2088
2089
2090 class LUQueryNodeVolumes(NoHooksLU):
2091   """Logical unit for getting volumes on node(s).
2092
2093   """
2094   _OP_REQP = ["nodes", "output_fields"]
2095   REQ_BGL = False
2096   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2097   _FIELDS_STATIC = utils.FieldSet("node")
2098
2099   def ExpandNames(self):
2100     _CheckOutputFields(static=self._FIELDS_STATIC,
2101                        dynamic=self._FIELDS_DYNAMIC,
2102                        selected=self.op.output_fields)
2103
2104     self.needed_locks = {}
2105     self.share_locks[locking.LEVEL_NODE] = 1
2106     if not self.op.nodes:
2107       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2108     else:
2109       self.needed_locks[locking.LEVEL_NODE] = \
2110         _GetWantedNodes(self, self.op.nodes)
2111
2112   def CheckPrereq(self):
2113     """Check prerequisites.
2114
2115     This checks that the fields required are valid output fields.
2116
2117     """
2118     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2119
2120   def Exec(self, feedback_fn):
2121     """Computes the list of nodes and their attributes.
2122
2123     """
2124     nodenames = self.nodes
2125     volumes = self.rpc.call_node_volumes(nodenames)
2126
2127     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2128              in self.cfg.GetInstanceList()]
2129
2130     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2131
2132     output = []
2133     for node in nodenames:
2134       nresult = volumes[node]
2135       if nresult.offline:
2136         continue
2137       msg = nresult.RemoteFailMsg()
2138       if msg:
2139         self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
2140         continue
2141
2142       node_vols = nresult.payload[:]
2143       node_vols.sort(key=lambda vol: vol['dev'])
2144
2145       for vol in node_vols:
2146         node_output = []
2147         for field in self.op.output_fields:
2148           if field == "node":
2149             val = node
2150           elif field == "phys":
2151             val = vol['dev']
2152           elif field == "vg":
2153             val = vol['vg']
2154           elif field == "name":
2155             val = vol['name']
2156           elif field == "size":
2157             val = int(float(vol['size']))
2158           elif field == "instance":
2159             for inst in ilist:
2160               if node not in lv_by_node[inst]:
2161                 continue
2162               if vol['name'] in lv_by_node[inst][node]:
2163                 val = inst.name
2164                 break
2165             else:
2166               val = '-'
2167           else:
2168             raise errors.ParameterError(field)
2169           node_output.append(str(val))
2170
2171         output.append(node_output)
2172
2173     return output
2174
2175
2176 class LUAddNode(LogicalUnit):
2177   """Logical unit for adding node to the cluster.
2178
2179   """
2180   HPATH = "node-add"
2181   HTYPE = constants.HTYPE_NODE
2182   _OP_REQP = ["node_name"]
2183
2184   def BuildHooksEnv(self):
2185     """Build hooks env.
2186
2187     This will run on all nodes before, and on all nodes + the new node after.
2188
2189     """
2190     env = {
2191       "OP_TARGET": self.op.node_name,
2192       "NODE_NAME": self.op.node_name,
2193       "NODE_PIP": self.op.primary_ip,
2194       "NODE_SIP": self.op.secondary_ip,
2195       }
2196     nodes_0 = self.cfg.GetNodeList()
2197     nodes_1 = nodes_0 + [self.op.node_name, ]
2198     return env, nodes_0, nodes_1
2199
2200   def CheckPrereq(self):
2201     """Check prerequisites.
2202
2203     This checks:
2204      - the new node is not already in the config
2205      - it is resolvable
2206      - its parameters (single/dual homed) matches the cluster
2207
2208     Any errors are signalled by raising errors.OpPrereqError.
2209
2210     """
2211     node_name = self.op.node_name
2212     cfg = self.cfg
2213
2214     dns_data = utils.HostInfo(node_name)
2215
2216     node = dns_data.name
2217     primary_ip = self.op.primary_ip = dns_data.ip
2218     secondary_ip = getattr(self.op, "secondary_ip", None)
2219     if secondary_ip is None:
2220       secondary_ip = primary_ip
2221     if not utils.IsValidIP(secondary_ip):
2222       raise errors.OpPrereqError("Invalid secondary IP given")
2223     self.op.secondary_ip = secondary_ip
2224
2225     node_list = cfg.GetNodeList()
2226     if not self.op.readd and node in node_list:
2227       raise errors.OpPrereqError("Node %s is already in the configuration" %
2228                                  node)
2229     elif self.op.readd and node not in node_list:
2230       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2231
2232     for existing_node_name in node_list:
2233       existing_node = cfg.GetNodeInfo(existing_node_name)
2234
2235       if self.op.readd and node == existing_node_name:
2236         if (existing_node.primary_ip != primary_ip or
2237             existing_node.secondary_ip != secondary_ip):
2238           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2239                                      " address configuration as before")
2240         continue
2241
2242       if (existing_node.primary_ip == primary_ip or
2243           existing_node.secondary_ip == primary_ip or
2244           existing_node.primary_ip == secondary_ip or
2245           existing_node.secondary_ip == secondary_ip):
2246         raise errors.OpPrereqError("New node ip address(es) conflict with"
2247                                    " existing node %s" % existing_node.name)
2248
2249     # check that the type of the node (single versus dual homed) is the
2250     # same as for the master
2251     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2252     master_singlehomed = myself.secondary_ip == myself.primary_ip
2253     newbie_singlehomed = secondary_ip == primary_ip
2254     if master_singlehomed != newbie_singlehomed:
2255       if master_singlehomed:
2256         raise errors.OpPrereqError("The master has no private ip but the"
2257                                    " new node has one")
2258       else:
2259         raise errors.OpPrereqError("The master has a private ip but the"
2260                                    " new node doesn't have one")
2261
2262     # checks reachablity
2263     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2264       raise errors.OpPrereqError("Node not reachable by ping")
2265
2266     if not newbie_singlehomed:
2267       # check reachability from my secondary ip to newbie's secondary ip
2268       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2269                            source=myself.secondary_ip):
2270         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2271                                    " based ping to noded port")
2272
2273     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2274     mc_now, _ = self.cfg.GetMasterCandidateStats()
2275     master_candidate = mc_now < cp_size
2276
2277     self.new_node = objects.Node(name=node,
2278                                  primary_ip=primary_ip,
2279                                  secondary_ip=secondary_ip,
2280                                  master_candidate=master_candidate,
2281                                  offline=False, drained=False)
2282
2283   def Exec(self, feedback_fn):
2284     """Adds the new node to the cluster.
2285
2286     """
2287     new_node = self.new_node
2288     node = new_node.name
2289
2290     # check connectivity
2291     result = self.rpc.call_version([node])[node]
2292     msg = result.RemoteFailMsg()
2293     if msg:
2294       raise errors.OpExecError("Can't get version information from"
2295                                " node %s: %s" % (node, msg))
2296     if constants.PROTOCOL_VERSION == result.payload:
2297       logging.info("Communication to node %s fine, sw version %s match",
2298                    node, result.payload)
2299     else:
2300       raise errors.OpExecError("Version mismatch master version %s,"
2301                                " node version %s" %
2302                                (constants.PROTOCOL_VERSION, result.payload))
2303
2304     # setup ssh on node
2305     logging.info("Copy ssh key to node %s", node)
2306     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2307     keyarray = []
2308     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2309                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2310                 priv_key, pub_key]
2311
2312     for i in keyfiles:
2313       f = open(i, 'r')
2314       try:
2315         keyarray.append(f.read())
2316       finally:
2317         f.close()
2318
2319     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2320                                     keyarray[2],
2321                                     keyarray[3], keyarray[4], keyarray[5])
2322
2323     msg = result.RemoteFailMsg()
2324     if msg:
2325       raise errors.OpExecError("Cannot transfer ssh keys to the"
2326                                " new node: %s" % msg)
2327
2328     # Add node to our /etc/hosts, and add key to known_hosts
2329     if self.cfg.GetClusterInfo().modify_etc_hosts:
2330       utils.AddHostToEtcHosts(new_node.name)
2331
2332     if new_node.secondary_ip != new_node.primary_ip:
2333       result = self.rpc.call_node_has_ip_address(new_node.name,
2334                                                  new_node.secondary_ip)
2335       msg = result.RemoteFailMsg()
2336       if msg:
2337         raise errors.OpPrereqError("Failure checking secondary ip"
2338                                    " on node %s: %s" % (new_node.name, msg))
2339       if not result.payload:
2340         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2341                                  " you gave (%s). Please fix and re-run this"
2342                                  " command." % new_node.secondary_ip)
2343
2344     node_verify_list = [self.cfg.GetMasterNode()]
2345     node_verify_param = {
2346       'nodelist': [node],
2347       # TODO: do a node-net-test as well?
2348     }
2349
2350     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2351                                        self.cfg.GetClusterName())
2352     for verifier in node_verify_list:
2353       msg = result[verifier].RemoteFailMsg()
2354       if msg:
2355         raise errors.OpExecError("Cannot communicate with node %s: %s" %
2356                                  (verifier, msg))
2357       nl_payload = result[verifier].payload['nodelist']
2358       if nl_payload:
2359         for failed in nl_payload:
2360           feedback_fn("ssh/hostname verification failed %s -> %s" %
2361                       (verifier, nl_payload[failed]))
2362         raise errors.OpExecError("ssh/hostname verification failed.")
2363
2364     if self.op.readd:
2365       _RedistributeAncillaryFiles(self)
2366       self.context.ReaddNode(new_node)
2367     else:
2368       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2369       self.context.AddNode(new_node)
2370
2371
2372 class LUSetNodeParams(LogicalUnit):
2373   """Modifies the parameters of a node.
2374
2375   """
2376   HPATH = "node-modify"
2377   HTYPE = constants.HTYPE_NODE
2378   _OP_REQP = ["node_name"]
2379   REQ_BGL = False
2380
2381   def CheckArguments(self):
2382     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2383     if node_name is None:
2384       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2385     self.op.node_name = node_name
2386     _CheckBooleanOpField(self.op, 'master_candidate')
2387     _CheckBooleanOpField(self.op, 'offline')
2388     _CheckBooleanOpField(self.op, 'drained')
2389     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2390     if all_mods.count(None) == 3:
2391       raise errors.OpPrereqError("Please pass at least one modification")
2392     if all_mods.count(True) > 1:
2393       raise errors.OpPrereqError("Can't set the node into more than one"
2394                                  " state at the same time")
2395
2396   def ExpandNames(self):
2397     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2398
2399   def BuildHooksEnv(self):
2400     """Build hooks env.
2401
2402     This runs on the master node.
2403
2404     """
2405     env = {
2406       "OP_TARGET": self.op.node_name,
2407       "MASTER_CANDIDATE": str(self.op.master_candidate),
2408       "OFFLINE": str(self.op.offline),
2409       "DRAINED": str(self.op.drained),
2410       }
2411     nl = [self.cfg.GetMasterNode(),
2412           self.op.node_name]
2413     return env, nl, nl
2414
2415   def CheckPrereq(self):
2416     """Check prerequisites.
2417
2418     This only checks the instance list against the existing names.
2419
2420     """
2421     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2422
2423     if ((self.op.master_candidate == False or self.op.offline == True or
2424          self.op.drained == True) and node.master_candidate):
2425       # we will demote the node from master_candidate
2426       if self.op.node_name == self.cfg.GetMasterNode():
2427         raise errors.OpPrereqError("The master node has to be a"
2428                                    " master candidate, online and not drained")
2429       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2430       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2431       if num_candidates <= cp_size:
2432         msg = ("Not enough master candidates (desired"
2433                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2434         if self.op.force:
2435           self.LogWarning(msg)
2436         else:
2437           raise errors.OpPrereqError(msg)
2438
2439     if (self.op.master_candidate == True and
2440         ((node.offline and not self.op.offline == False) or
2441          (node.drained and not self.op.drained == False))):
2442       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2443                                  " to master_candidate" % node.name)
2444
2445     return
2446
2447   def Exec(self, feedback_fn):
2448     """Modifies a node.
2449
2450     """
2451     node = self.node
2452
2453     result = []
2454     changed_mc = False
2455
2456     if self.op.offline is not None:
2457       node.offline = self.op.offline
2458       result.append(("offline", str(self.op.offline)))
2459       if self.op.offline == True:
2460         if node.master_candidate:
2461           node.master_candidate = False
2462           changed_mc = True
2463           result.append(("master_candidate", "auto-demotion due to offline"))
2464         if node.drained:
2465           node.drained = False
2466           result.append(("drained", "clear drained status due to offline"))
2467
2468     if self.op.master_candidate is not None:
2469       node.master_candidate = self.op.master_candidate
2470       changed_mc = True
2471       result.append(("master_candidate", str(self.op.master_candidate)))
2472       if self.op.master_candidate == False:
2473         rrc = self.rpc.call_node_demote_from_mc(node.name)
2474         msg = rrc.RemoteFailMsg()
2475         if msg:
2476           self.LogWarning("Node failed to demote itself: %s" % msg)
2477
2478     if self.op.drained is not None:
2479       node.drained = self.op.drained
2480       result.append(("drained", str(self.op.drained)))
2481       if self.op.drained == True:
2482         if node.master_candidate:
2483           node.master_candidate = False
2484           changed_mc = True
2485           result.append(("master_candidate", "auto-demotion due to drain"))
2486         if node.offline:
2487           node.offline = False
2488           result.append(("offline", "clear offline status due to drain"))
2489
2490     # this will trigger configuration file update, if needed
2491     self.cfg.Update(node)
2492     # this will trigger job queue propagation or cleanup
2493     if changed_mc:
2494       self.context.ReaddNode(node)
2495
2496     return result
2497
2498
2499 class LUPowercycleNode(NoHooksLU):
2500   """Powercycles a node.
2501
2502   """
2503   _OP_REQP = ["node_name", "force"]
2504   REQ_BGL = False
2505
2506   def CheckArguments(self):
2507     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2508     if node_name is None:
2509       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2510     self.op.node_name = node_name
2511     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2512       raise errors.OpPrereqError("The node is the master and the force"
2513                                  " parameter was not set")
2514
2515   def ExpandNames(self):
2516     """Locking for PowercycleNode.
2517
2518     This is a last-resource option and shouldn't block on other
2519     jobs. Therefore, we grab no locks.
2520
2521     """
2522     self.needed_locks = {}
2523
2524   def CheckPrereq(self):
2525     """Check prerequisites.
2526
2527     This LU has no prereqs.
2528
2529     """
2530     pass
2531
2532   def Exec(self, feedback_fn):
2533     """Reboots a node.
2534
2535     """
2536     result = self.rpc.call_node_powercycle(self.op.node_name,
2537                                            self.cfg.GetHypervisorType())
2538     msg = result.RemoteFailMsg()
2539     if msg:
2540       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2541     return result.payload
2542
2543
2544 class LUQueryClusterInfo(NoHooksLU):
2545   """Query cluster configuration.
2546
2547   """
2548   _OP_REQP = []
2549   REQ_BGL = False
2550
2551   def ExpandNames(self):
2552     self.needed_locks = {}
2553
2554   def CheckPrereq(self):
2555     """No prerequsites needed for this LU.
2556
2557     """
2558     pass
2559
2560   def Exec(self, feedback_fn):
2561     """Return cluster config.
2562
2563     """
2564     cluster = self.cfg.GetClusterInfo()
2565     result = {
2566       "software_version": constants.RELEASE_VERSION,
2567       "protocol_version": constants.PROTOCOL_VERSION,
2568       "config_version": constants.CONFIG_VERSION,
2569       "os_api_version": constants.OS_API_VERSION,
2570       "export_version": constants.EXPORT_VERSION,
2571       "architecture": (platform.architecture()[0], platform.machine()),
2572       "name": cluster.cluster_name,
2573       "master": cluster.master_node,
2574       "default_hypervisor": cluster.default_hypervisor,
2575       "enabled_hypervisors": cluster.enabled_hypervisors,
2576       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2577                         for hypervisor in cluster.enabled_hypervisors]),
2578       "beparams": cluster.beparams,
2579       "nicparams": cluster.nicparams,
2580       "candidate_pool_size": cluster.candidate_pool_size,
2581       "master_netdev": cluster.master_netdev,
2582       "volume_group_name": cluster.volume_group_name,
2583       "file_storage_dir": cluster.file_storage_dir,
2584       }
2585
2586     return result
2587
2588
2589 class LUQueryConfigValues(NoHooksLU):
2590   """Return configuration values.
2591
2592   """
2593   _OP_REQP = []
2594   REQ_BGL = False
2595   _FIELDS_DYNAMIC = utils.FieldSet()
2596   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2597
2598   def ExpandNames(self):
2599     self.needed_locks = {}
2600
2601     _CheckOutputFields(static=self._FIELDS_STATIC,
2602                        dynamic=self._FIELDS_DYNAMIC,
2603                        selected=self.op.output_fields)
2604
2605   def CheckPrereq(self):
2606     """No prerequisites.
2607
2608     """
2609     pass
2610
2611   def Exec(self, feedback_fn):
2612     """Dump a representation of the cluster config to the standard output.
2613
2614     """
2615     values = []
2616     for field in self.op.output_fields:
2617       if field == "cluster_name":
2618         entry = self.cfg.GetClusterName()
2619       elif field == "master_node":
2620         entry = self.cfg.GetMasterNode()
2621       elif field == "drain_flag":
2622         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2623       else:
2624         raise errors.ParameterError(field)
2625       values.append(entry)
2626     return values
2627
2628
2629 class LUActivateInstanceDisks(NoHooksLU):
2630   """Bring up an instance's disks.
2631
2632   """
2633   _OP_REQP = ["instance_name"]
2634   REQ_BGL = False
2635
2636   def ExpandNames(self):
2637     self._ExpandAndLockInstance()
2638     self.needed_locks[locking.LEVEL_NODE] = []
2639     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2640
2641   def DeclareLocks(self, level):
2642     if level == locking.LEVEL_NODE:
2643       self._LockInstancesNodes()
2644
2645   def CheckPrereq(self):
2646     """Check prerequisites.
2647
2648     This checks that the instance is in the cluster.
2649
2650     """
2651     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2652     assert self.instance is not None, \
2653       "Cannot retrieve locked instance %s" % self.op.instance_name
2654     _CheckNodeOnline(self, self.instance.primary_node)
2655
2656   def Exec(self, feedback_fn):
2657     """Activate the disks.
2658
2659     """
2660     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2661     if not disks_ok:
2662       raise errors.OpExecError("Cannot activate block devices")
2663
2664     return disks_info
2665
2666
2667 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2668   """Prepare the block devices for an instance.
2669
2670   This sets up the block devices on all nodes.
2671
2672   @type lu: L{LogicalUnit}
2673   @param lu: the logical unit on whose behalf we execute
2674   @type instance: L{objects.Instance}
2675   @param instance: the instance for whose disks we assemble
2676   @type ignore_secondaries: boolean
2677   @param ignore_secondaries: if true, errors on secondary nodes
2678       won't result in an error return from the function
2679   @return: False if the operation failed, otherwise a list of
2680       (host, instance_visible_name, node_visible_name)
2681       with the mapping from node devices to instance devices
2682
2683   """
2684   device_info = []
2685   disks_ok = True
2686   iname = instance.name
2687   # With the two passes mechanism we try to reduce the window of
2688   # opportunity for the race condition of switching DRBD to primary
2689   # before handshaking occured, but we do not eliminate it
2690
2691   # The proper fix would be to wait (with some limits) until the
2692   # connection has been made and drbd transitions from WFConnection
2693   # into any other network-connected state (Connected, SyncTarget,
2694   # SyncSource, etc.)
2695
2696   # 1st pass, assemble on all nodes in secondary mode
2697   for inst_disk in instance.disks:
2698     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2699       lu.cfg.SetDiskID(node_disk, node)
2700       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2701       msg = result.RemoteFailMsg()
2702       if msg:
2703         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2704                            " (is_primary=False, pass=1): %s",
2705                            inst_disk.iv_name, node, msg)
2706         if not ignore_secondaries:
2707           disks_ok = False
2708
2709   # FIXME: race condition on drbd migration to primary
2710
2711   # 2nd pass, do only the primary node
2712   for inst_disk in instance.disks:
2713     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2714       if node != instance.primary_node:
2715         continue
2716       lu.cfg.SetDiskID(node_disk, node)
2717       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2718       msg = result.RemoteFailMsg()
2719       if msg:
2720         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2721                            " (is_primary=True, pass=2): %s",
2722                            inst_disk.iv_name, node, msg)
2723         disks_ok = False
2724     device_info.append((instance.primary_node, inst_disk.iv_name,
2725                         result.payload))
2726
2727   # leave the disks configured for the primary node
2728   # this is a workaround that would be fixed better by
2729   # improving the logical/physical id handling
2730   for disk in instance.disks:
2731     lu.cfg.SetDiskID(disk, instance.primary_node)
2732
2733   return disks_ok, device_info
2734
2735
2736 def _StartInstanceDisks(lu, instance, force):
2737   """Start the disks of an instance.
2738
2739   """
2740   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2741                                            ignore_secondaries=force)
2742   if not disks_ok:
2743     _ShutdownInstanceDisks(lu, instance)
2744     if force is not None and not force:
2745       lu.proc.LogWarning("", hint="If the message above refers to a"
2746                          " secondary node,"
2747                          " you can retry the operation using '--force'.")
2748     raise errors.OpExecError("Disk consistency error")
2749
2750
2751 class LUDeactivateInstanceDisks(NoHooksLU):
2752   """Shutdown an instance's disks.
2753
2754   """
2755   _OP_REQP = ["instance_name"]
2756   REQ_BGL = False
2757
2758   def ExpandNames(self):
2759     self._ExpandAndLockInstance()
2760     self.needed_locks[locking.LEVEL_NODE] = []
2761     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2762
2763   def DeclareLocks(self, level):
2764     if level == locking.LEVEL_NODE:
2765       self._LockInstancesNodes()
2766
2767   def CheckPrereq(self):
2768     """Check prerequisites.
2769
2770     This checks that the instance is in the cluster.
2771
2772     """
2773     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2774     assert self.instance is not None, \
2775       "Cannot retrieve locked instance %s" % self.op.instance_name
2776
2777   def Exec(self, feedback_fn):
2778     """Deactivate the disks
2779
2780     """
2781     instance = self.instance
2782     _SafeShutdownInstanceDisks(self, instance)
2783
2784
2785 def _SafeShutdownInstanceDisks(lu, instance):
2786   """Shutdown block devices of an instance.
2787
2788   This function checks if an instance is running, before calling
2789   _ShutdownInstanceDisks.
2790
2791   """
2792   pnode = instance.primary_node
2793   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])
2794   ins_l = ins_l[pnode]
2795   msg = ins_l.RemoteFailMsg()
2796   if msg:
2797     raise errors.OpExecError("Can't contact node %s: %s" % (pnode, msg))
2798
2799   if instance.name in ins_l.payload:
2800     raise errors.OpExecError("Instance is running, can't shutdown"
2801                              " block devices.")
2802
2803   _ShutdownInstanceDisks(lu, instance)
2804
2805
2806 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2807   """Shutdown block devices of an instance.
2808
2809   This does the shutdown on all nodes of the instance.
2810
2811   If the ignore_primary is false, errors on the primary node are
2812   ignored.
2813
2814   """
2815   all_result = True
2816   for disk in instance.disks:
2817     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2818       lu.cfg.SetDiskID(top_disk, node)
2819       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2820       msg = result.RemoteFailMsg()
2821       if msg:
2822         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2823                       disk.iv_name, node, msg)
2824         if not ignore_primary or node != instance.primary_node:
2825           all_result = False
2826   return all_result
2827
2828
2829 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2830   """Checks if a node has enough free memory.
2831
2832   This function check if a given node has the needed amount of free
2833   memory. In case the node has less memory or we cannot get the
2834   information from the node, this function raise an OpPrereqError
2835   exception.
2836
2837   @type lu: C{LogicalUnit}
2838   @param lu: a logical unit from which we get configuration data
2839   @type node: C{str}
2840   @param node: the node to check
2841   @type reason: C{str}
2842   @param reason: string to use in the error message
2843   @type requested: C{int}
2844   @param requested: the amount of memory in MiB to check for
2845   @type hypervisor_name: C{str}
2846   @param hypervisor_name: the hypervisor to ask for memory stats
2847   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2848       we cannot check the node
2849
2850   """
2851   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2852   msg = nodeinfo[node].RemoteFailMsg()
2853   if msg:
2854     raise errors.OpPrereqError("Can't get data from node %s: %s" % (node, msg))
2855   free_mem = nodeinfo[node].payload.get('memory_free', None)
2856   if not isinstance(free_mem, int):
2857     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2858                                " was '%s'" % (node, free_mem))
2859   if requested > free_mem:
2860     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2861                                " needed %s MiB, available %s MiB" %
2862                                (node, reason, requested, free_mem))
2863
2864
2865 class LUStartupInstance(LogicalUnit):
2866   """Starts an instance.
2867
2868   """
2869   HPATH = "instance-start"
2870   HTYPE = constants.HTYPE_INSTANCE
2871   _OP_REQP = ["instance_name", "force"]
2872   REQ_BGL = False
2873
2874   def ExpandNames(self):
2875     self._ExpandAndLockInstance()
2876
2877   def BuildHooksEnv(self):
2878     """Build hooks env.
2879
2880     This runs on master, primary and secondary nodes of the instance.
2881
2882     """
2883     env = {
2884       "FORCE": self.op.force,
2885       }
2886     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2887     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2888     return env, nl, nl
2889
2890   def CheckPrereq(self):
2891     """Check prerequisites.
2892
2893     This checks that the instance is in the cluster.
2894
2895     """
2896     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2897     assert self.instance is not None, \
2898       "Cannot retrieve locked instance %s" % self.op.instance_name
2899
2900     # extra beparams
2901     self.beparams = getattr(self.op, "beparams", {})
2902     if self.beparams:
2903       if not isinstance(self.beparams, dict):
2904         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2905                                    " dict" % (type(self.beparams), ))
2906       # fill the beparams dict
2907       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2908       self.op.beparams = self.beparams
2909
2910     # extra hvparams
2911     self.hvparams = getattr(self.op, "hvparams", {})
2912     if self.hvparams:
2913       if not isinstance(self.hvparams, dict):
2914         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2915                                    " dict" % (type(self.hvparams), ))
2916
2917       # check hypervisor parameter syntax (locally)
2918       cluster = self.cfg.GetClusterInfo()
2919       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2920       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2921                                     instance.hvparams)
2922       filled_hvp.update(self.hvparams)
2923       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2924       hv_type.CheckParameterSyntax(filled_hvp)
2925       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2926       self.op.hvparams = self.hvparams
2927
2928     _CheckNodeOnline(self, instance.primary_node)
2929
2930     bep = self.cfg.GetClusterInfo().FillBE(instance)
2931     # check bridges existance
2932     _CheckInstanceBridgesExist(self, instance)
2933
2934     remote_info = self.rpc.call_instance_info(instance.primary_node,
2935                                               instance.name,
2936                                               instance.hypervisor)
2937     msg = remote_info.RemoteFailMsg()
2938     if msg:
2939       raise errors.OpPrereqError("Error checking node %s: %s" %
2940                                  (instance.primary_node, msg))
2941     if not remote_info.payload: # not running already
2942       _CheckNodeFreeMemory(self, instance.primary_node,
2943                            "starting instance %s" % instance.name,
2944                            bep[constants.BE_MEMORY], instance.hypervisor)
2945
2946   def Exec(self, feedback_fn):
2947     """Start the instance.
2948
2949     """
2950     instance = self.instance
2951     force = self.op.force
2952
2953     self.cfg.MarkInstanceUp(instance.name)
2954
2955     node_current = instance.primary_node
2956
2957     _StartInstanceDisks(self, instance, force)
2958
2959     result = self.rpc.call_instance_start(node_current, instance,
2960                                           self.hvparams, self.beparams)
2961     msg = result.RemoteFailMsg()
2962     if msg:
2963       _ShutdownInstanceDisks(self, instance)
2964       raise errors.OpExecError("Could not start instance: %s" % msg)
2965
2966
2967 class LURebootInstance(LogicalUnit):
2968   """Reboot an instance.
2969
2970   """
2971   HPATH = "instance-reboot"
2972   HTYPE = constants.HTYPE_INSTANCE
2973   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2974   REQ_BGL = False
2975
2976   def ExpandNames(self):
2977     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2978                                    constants.INSTANCE_REBOOT_HARD,
2979                                    constants.INSTANCE_REBOOT_FULL]:
2980       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2981                                   (constants.INSTANCE_REBOOT_SOFT,
2982                                    constants.INSTANCE_REBOOT_HARD,
2983                                    constants.INSTANCE_REBOOT_FULL))
2984     self._ExpandAndLockInstance()
2985
2986   def BuildHooksEnv(self):
2987     """Build hooks env.
2988
2989     This runs on master, primary and secondary nodes of the instance.
2990
2991     """
2992     env = {
2993       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2994       "REBOOT_TYPE": self.op.reboot_type,
2995       }
2996     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2997     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2998     return env, nl, nl
2999
3000   def CheckPrereq(self):
3001     """Check prerequisites.
3002
3003     This checks that the instance is in the cluster.
3004
3005     """
3006     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3007     assert self.instance is not None, \
3008       "Cannot retrieve locked instance %s" % self.op.instance_name
3009
3010     _CheckNodeOnline(self, instance.primary_node)
3011
3012     # check bridges existance
3013     _CheckInstanceBridgesExist(self, instance)
3014
3015   def Exec(self, feedback_fn):
3016     """Reboot the instance.
3017
3018     """
3019     instance = self.instance
3020     ignore_secondaries = self.op.ignore_secondaries
3021     reboot_type = self.op.reboot_type
3022
3023     node_current = instance.primary_node
3024
3025     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3026                        constants.INSTANCE_REBOOT_HARD]:
3027       for disk in instance.disks:
3028         self.cfg.SetDiskID(disk, node_current)
3029       result = self.rpc.call_instance_reboot(node_current, instance,
3030                                              reboot_type)
3031       msg = result.RemoteFailMsg()
3032       if msg:
3033         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3034     else:
3035       result = self.rpc.call_instance_shutdown(node_current, instance)
3036       msg = result.RemoteFailMsg()
3037       if msg:
3038         raise errors.OpExecError("Could not shutdown instance for"
3039                                  " full reboot: %s" % msg)
3040       _ShutdownInstanceDisks(self, instance)
3041       _StartInstanceDisks(self, instance, ignore_secondaries)
3042       result = self.rpc.call_instance_start(node_current, instance, None, None)
3043       msg = result.RemoteFailMsg()
3044       if msg:
3045         _ShutdownInstanceDisks(self, instance)
3046         raise errors.OpExecError("Could not start instance for"
3047                                  " full reboot: %s" % msg)
3048
3049     self.cfg.MarkInstanceUp(instance.name)
3050
3051
3052 class LUShutdownInstance(LogicalUnit):
3053   """Shutdown an instance.
3054
3055   """
3056   HPATH = "instance-stop"
3057   HTYPE = constants.HTYPE_INSTANCE
3058   _OP_REQP = ["instance_name"]
3059   REQ_BGL = False
3060
3061   def ExpandNames(self):
3062     self._ExpandAndLockInstance()
3063
3064   def BuildHooksEnv(self):
3065     """Build hooks env.
3066
3067     This runs on master, primary and secondary nodes of the instance.
3068
3069     """
3070     env = _BuildInstanceHookEnvByObject(self, self.instance)
3071     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3072     return env, nl, nl
3073
3074   def CheckPrereq(self):
3075     """Check prerequisites.
3076
3077     This checks that the instance is in the cluster.
3078
3079     """
3080     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3081     assert self.instance is not None, \
3082       "Cannot retrieve locked instance %s" % self.op.instance_name
3083     _CheckNodeOnline(self, self.instance.primary_node)
3084
3085   def Exec(self, feedback_fn):
3086     """Shutdown the instance.
3087
3088     """
3089     instance = self.instance
3090     node_current = instance.primary_node
3091     self.cfg.MarkInstanceDown(instance.name)
3092     result = self.rpc.call_instance_shutdown(node_current, instance)
3093     msg = result.RemoteFailMsg()
3094     if msg:
3095       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3096
3097     _ShutdownInstanceDisks(self, instance)
3098
3099
3100 class LUReinstallInstance(LogicalUnit):
3101   """Reinstall an instance.
3102
3103   """
3104   HPATH = "instance-reinstall"
3105   HTYPE = constants.HTYPE_INSTANCE
3106   _OP_REQP = ["instance_name"]
3107   REQ_BGL = False
3108
3109   def ExpandNames(self):
3110     self._ExpandAndLockInstance()
3111
3112   def BuildHooksEnv(self):
3113     """Build hooks env.
3114
3115     This runs on master, primary and secondary nodes of the instance.
3116
3117     """
3118     env = _BuildInstanceHookEnvByObject(self, self.instance)
3119     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3120     return env, nl, nl
3121
3122   def CheckPrereq(self):
3123     """Check prerequisites.
3124
3125     This checks that the instance is in the cluster and is not running.
3126
3127     """
3128     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3129     assert instance is not None, \
3130       "Cannot retrieve locked instance %s" % self.op.instance_name
3131     _CheckNodeOnline(self, instance.primary_node)
3132
3133     if instance.disk_template == constants.DT_DISKLESS:
3134       raise errors.OpPrereqError("Instance '%s' has no disks" %
3135                                  self.op.instance_name)
3136     if instance.admin_up:
3137       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3138                                  self.op.instance_name)
3139     remote_info = self.rpc.call_instance_info(instance.primary_node,
3140                                               instance.name,
3141                                               instance.hypervisor)
3142     msg = remote_info.RemoteFailMsg()
3143     if msg:
3144       raise errors.OpPrereqError("Error checking node %s: %s" %
3145                                  (instance.primary_node, msg))
3146     if remote_info.payload:
3147       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3148                                  (self.op.instance_name,
3149                                   instance.primary_node))
3150
3151     self.op.os_type = getattr(self.op, "os_type", None)
3152     if self.op.os_type is not None:
3153       # OS verification
3154       pnode = self.cfg.GetNodeInfo(
3155         self.cfg.ExpandNodeName(instance.primary_node))
3156       if pnode is None:
3157         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3158                                    self.op.pnode)
3159       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3160       msg = result.RemoteFailMsg()
3161       if msg:
3162         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3163                                    " primary node %s: %s"  %
3164                                    (self.op.os_type, pnode.pname, msg))
3165
3166     self.instance = instance
3167
3168   def Exec(self, feedback_fn):
3169     """Reinstall the instance.
3170
3171     """
3172     inst = self.instance
3173
3174     if self.op.os_type is not None:
3175       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3176       inst.os = self.op.os_type
3177       self.cfg.Update(inst)
3178
3179     _StartInstanceDisks(self, inst, None)
3180     try:
3181       feedback_fn("Running the instance OS create scripts...")
3182       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3183       msg = result.RemoteFailMsg()
3184       if msg:
3185         raise errors.OpExecError("Could not install OS for instance %s"
3186                                  " on node %s: %s" %
3187                                  (inst.name, inst.primary_node, msg))
3188     finally:
3189       _ShutdownInstanceDisks(self, inst)
3190
3191
3192 class LURenameInstance(LogicalUnit):
3193   """Rename an instance.
3194
3195   """
3196   HPATH = "instance-rename"
3197   HTYPE = constants.HTYPE_INSTANCE
3198   _OP_REQP = ["instance_name", "new_name"]
3199
3200   def BuildHooksEnv(self):
3201     """Build hooks env.
3202
3203     This runs on master, primary and secondary nodes of the instance.
3204
3205     """
3206     env = _BuildInstanceHookEnvByObject(self, self.instance)
3207     env["INSTANCE_NEW_NAME"] = self.op.new_name
3208     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3209     return env, nl, nl
3210
3211   def CheckPrereq(self):
3212     """Check prerequisites.
3213
3214     This checks that the instance is in the cluster and is not running.
3215
3216     """
3217     instance = self.cfg.GetInstanceInfo(
3218       self.cfg.ExpandInstanceName(self.op.instance_name))
3219     if instance is None:
3220       raise errors.OpPrereqError("Instance '%s' not known" %
3221                                  self.op.instance_name)
3222     _CheckNodeOnline(self, instance.primary_node)
3223
3224     if instance.admin_up:
3225       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3226                                  self.op.instance_name)
3227     remote_info = self.rpc.call_instance_info(instance.primary_node,
3228                                               instance.name,
3229                                               instance.hypervisor)
3230     msg = remote_info.RemoteFailMsg()
3231     if msg:
3232       raise errors.OpPrereqError("Error checking node %s: %s" %
3233                                  (instance.primary_node, msg))
3234     if remote_info.payload:
3235       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3236                                  (self.op.instance_name,
3237                                   instance.primary_node))
3238     self.instance = instance
3239
3240     # new name verification
3241     name_info = utils.HostInfo(self.op.new_name)
3242
3243     self.op.new_name = new_name = name_info.name
3244     instance_list = self.cfg.GetInstanceList()
3245     if new_name in instance_list:
3246       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3247                                  new_name)
3248
3249     if not getattr(self.op, "ignore_ip", False):
3250       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3251         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3252                                    (name_info.ip, new_name))
3253
3254
3255   def Exec(self, feedback_fn):
3256     """Reinstall the instance.
3257
3258     """
3259     inst = self.instance
3260     old_name = inst.name
3261
3262     if inst.disk_template == constants.DT_FILE:
3263       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3264
3265     self.cfg.RenameInstance(inst.name, self.op.new_name)
3266     # Change the instance lock. This is definitely safe while we hold the BGL
3267     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3268     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3269
3270     # re-read the instance from the configuration after rename
3271     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3272
3273     if inst.disk_template == constants.DT_FILE:
3274       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3275       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3276                                                      old_file_storage_dir,
3277                                                      new_file_storage_dir)
3278       msg = result.RemoteFailMsg()
3279       if msg:
3280         raise errors.OpExecError("Could not rename on node %s"
3281                                  " directory '%s' to '%s' (but the instance"
3282                                  " has been renamed in Ganeti): %s" %
3283                                  (inst.primary_node, old_file_storage_dir,
3284                                   new_file_storage_dir, msg))
3285
3286     _StartInstanceDisks(self, inst, None)
3287     try:
3288       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3289                                                  old_name)
3290       msg = result.RemoteFailMsg()
3291       if msg:
3292         msg = ("Could not run OS rename script for instance %s on node %s"
3293                " (but the instance has been renamed in Ganeti): %s" %
3294                (inst.name, inst.primary_node, msg))
3295         self.proc.LogWarning(msg)
3296     finally:
3297       _ShutdownInstanceDisks(self, inst)
3298
3299
3300 class LURemoveInstance(LogicalUnit):
3301   """Remove an instance.
3302
3303   """
3304   HPATH = "instance-remove"
3305   HTYPE = constants.HTYPE_INSTANCE
3306   _OP_REQP = ["instance_name", "ignore_failures"]
3307   REQ_BGL = False
3308
3309   def ExpandNames(self):
3310     self._ExpandAndLockInstance()
3311     self.needed_locks[locking.LEVEL_NODE] = []
3312     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3313
3314   def DeclareLocks(self, level):
3315     if level == locking.LEVEL_NODE:
3316       self._LockInstancesNodes()
3317
3318   def BuildHooksEnv(self):
3319     """Build hooks env.
3320
3321     This runs on master, primary and secondary nodes of the instance.
3322
3323     """
3324     env = _BuildInstanceHookEnvByObject(self, self.instance)
3325     nl = [self.cfg.GetMasterNode()]
3326     return env, nl, nl
3327
3328   def CheckPrereq(self):
3329     """Check prerequisites.
3330
3331     This checks that the instance is in the cluster.
3332
3333     """
3334     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3335     assert self.instance is not None, \
3336       "Cannot retrieve locked instance %s" % self.op.instance_name
3337
3338   def Exec(self, feedback_fn):
3339     """Remove the instance.
3340
3341     """
3342     instance = self.instance
3343     logging.info("Shutting down instance %s on node %s",
3344                  instance.name, instance.primary_node)
3345
3346     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3347     msg = result.RemoteFailMsg()
3348     if msg:
3349       if self.op.ignore_failures:
3350         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3351       else:
3352         raise errors.OpExecError("Could not shutdown instance %s on"
3353                                  " node %s: %s" %
3354                                  (instance.name, instance.primary_node, msg))
3355
3356     logging.info("Removing block devices for instance %s", instance.name)
3357
3358     if not _RemoveDisks(self, instance):
3359       if self.op.ignore_failures:
3360         feedback_fn("Warning: can't remove instance's disks")
3361       else:
3362         raise errors.OpExecError("Can't remove instance's disks")
3363
3364     logging.info("Removing instance %s out of cluster config", instance.name)
3365
3366     self.cfg.RemoveInstance(instance.name)
3367     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3368
3369
3370 class LUQueryInstances(NoHooksLU):
3371   """Logical unit for querying instances.
3372
3373   """
3374   _OP_REQP = ["output_fields", "names", "use_locking"]
3375   REQ_BGL = False
3376   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3377                                     "admin_state",
3378                                     "disk_template", "ip", "mac", "bridge",
3379                                     "sda_size", "sdb_size", "vcpus", "tags",
3380                                     "network_port", "beparams",
3381                                     r"(disk)\.(size)/([0-9]+)",
3382                                     r"(disk)\.(sizes)", "disk_usage",
3383                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3384                                     r"(nic)\.(macs|ips|bridges)",
3385                                     r"(disk|nic)\.(count)",
3386                                     "serial_no", "hypervisor", "hvparams",] +
3387                                   ["hv/%s" % name
3388                                    for name in constants.HVS_PARAMETERS] +
3389                                   ["be/%s" % name
3390                                    for name in constants.BES_PARAMETERS])
3391   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3392
3393
3394   def ExpandNames(self):
3395     _CheckOutputFields(static=self._FIELDS_STATIC,
3396                        dynamic=self._FIELDS_DYNAMIC,
3397                        selected=self.op.output_fields)
3398
3399     self.needed_locks = {}
3400     self.share_locks[locking.LEVEL_INSTANCE] = 1
3401     self.share_locks[locking.LEVEL_NODE] = 1
3402
3403     if self.op.names:
3404       self.wanted = _GetWantedInstances(self, self.op.names)
3405     else:
3406       self.wanted = locking.ALL_SET
3407
3408     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3409     self.do_locking = self.do_node_query and self.op.use_locking
3410     if self.do_locking:
3411       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3412       self.needed_locks[locking.LEVEL_NODE] = []
3413       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3414
3415   def DeclareLocks(self, level):
3416     if level == locking.LEVEL_NODE and self.do_locking:
3417       self._LockInstancesNodes()
3418
3419   def CheckPrereq(self):
3420     """Check prerequisites.
3421
3422     """
3423     pass
3424
3425   def Exec(self, feedback_fn):
3426     """Computes the list of nodes and their attributes.
3427
3428     """
3429     all_info = self.cfg.GetAllInstancesInfo()
3430     if self.wanted == locking.ALL_SET:
3431       # caller didn't specify instance names, so ordering is not important
3432       if self.do_locking:
3433         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3434       else:
3435         instance_names = all_info.keys()
3436       instance_names = utils.NiceSort(instance_names)
3437     else:
3438       # caller did specify names, so we must keep the ordering
3439       if self.do_locking:
3440         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3441       else:
3442         tgt_set = all_info.keys()
3443       missing = set(self.wanted).difference(tgt_set)
3444       if missing:
3445         raise errors.OpExecError("Some instances were removed before"
3446                                  " retrieving their data: %s" % missing)
3447       instance_names = self.wanted
3448
3449     instance_list = [all_info[iname] for iname in instance_names]
3450
3451     # begin data gathering
3452
3453     nodes = frozenset([inst.primary_node for inst in instance_list])
3454     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3455
3456     bad_nodes = []
3457     off_nodes = []
3458     if self.do_node_query:
3459       live_data = {}
3460       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3461       for name in nodes:
3462         result = node_data[name]
3463         if result.offline:
3464           # offline nodes will be in both lists
3465           off_nodes.append(name)
3466         if result.failed or result.RemoteFailMsg():
3467           bad_nodes.append(name)
3468         else:
3469           if result.payload:
3470             live_data.update(result.payload)
3471           # else no instance is alive
3472     else:
3473       live_data = dict([(name, {}) for name in instance_names])
3474
3475     # end data gathering
3476
3477     HVPREFIX = "hv/"
3478     BEPREFIX = "be/"
3479     output = []
3480     for instance in instance_list:
3481       iout = []
3482       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3483       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3484       for field in self.op.output_fields:
3485         st_match = self._FIELDS_STATIC.Matches(field)
3486         if field == "name":
3487           val = instance.name
3488         elif field == "os":
3489           val = instance.os
3490         elif field == "pnode":
3491           val = instance.primary_node
3492         elif field == "snodes":
3493           val = list(instance.secondary_nodes)
3494         elif field == "admin_state":
3495           val = instance.admin_up
3496         elif field == "oper_state":
3497           if instance.primary_node in bad_nodes:
3498             val = None
3499           else:
3500             val = bool(live_data.get(instance.name))
3501         elif field == "status":
3502           if instance.primary_node in off_nodes:
3503             val = "ERROR_nodeoffline"
3504           elif instance.primary_node in bad_nodes:
3505             val = "ERROR_nodedown"
3506           else:
3507             running = bool(live_data.get(instance.name))
3508             if running:
3509               if instance.admin_up:
3510                 val = "running"
3511               else:
3512                 val = "ERROR_up"
3513             else:
3514               if instance.admin_up:
3515                 val = "ERROR_down"
3516               else:
3517                 val = "ADMIN_down"
3518         elif field == "oper_ram":
3519           if instance.primary_node in bad_nodes:
3520             val = None
3521           elif instance.name in live_data:
3522             val = live_data[instance.name].get("memory", "?")
3523           else:
3524             val = "-"
3525         elif field == "disk_template":
3526           val = instance.disk_template
3527         elif field == "ip":
3528           val = instance.nics[0].ip
3529         elif field == "bridge":
3530           val = instance.nics[0].bridge
3531         elif field == "mac":
3532           val = instance.nics[0].mac
3533         elif field == "sda_size" or field == "sdb_size":
3534           idx = ord(field[2]) - ord('a')
3535           try:
3536             val = instance.FindDisk(idx).size
3537           except errors.OpPrereqError:
3538             val = None
3539         elif field == "disk_usage": # total disk usage per node
3540           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3541           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3542         elif field == "tags":
3543           val = list(instance.GetTags())
3544         elif field == "serial_no":
3545           val = instance.serial_no
3546         elif field == "network_port":
3547           val = instance.network_port
3548         elif field == "hypervisor":
3549           val = instance.hypervisor
3550         elif field == "hvparams":
3551           val = i_hv
3552         elif (field.startswith(HVPREFIX) and
3553               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3554           val = i_hv.get(field[len(HVPREFIX):], None)
3555         elif field == "beparams":
3556           val = i_be
3557         elif (field.startswith(BEPREFIX) and
3558               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3559           val = i_be.get(field[len(BEPREFIX):], None)
3560         elif st_match and st_match.groups():
3561           # matches a variable list
3562           st_groups = st_match.groups()
3563           if st_groups and st_groups[0] == "disk":
3564             if st_groups[1] == "count":
3565               val = len(instance.disks)
3566             elif st_groups[1] == "sizes":
3567               val = [disk.size for disk in instance.disks]
3568             elif st_groups[1] == "size":
3569               try:
3570                 val = instance.FindDisk(st_groups[2]).size
3571               except errors.OpPrereqError:
3572                 val = None
3573             else:
3574               assert False, "Unhandled disk parameter"
3575           elif st_groups[0] == "nic":
3576             if st_groups[1] == "count":
3577               val = len(instance.nics)
3578             elif st_groups[1] == "macs":
3579               val = [nic.mac for nic in instance.nics]
3580             elif st_groups[1] == "ips":
3581               val = [nic.ip for nic in instance.nics]
3582             elif st_groups[1] == "bridges":
3583               val = [nic.bridge for nic in instance.nics]
3584             else:
3585               # index-based item
3586               nic_idx = int(st_groups[2])
3587               if nic_idx >= len(instance.nics):
3588                 val = None
3589               else:
3590                 if st_groups[1] == "mac":
3591                   val = instance.nics[nic_idx].mac
3592                 elif st_groups[1] == "ip":
3593                   val = instance.nics[nic_idx].ip
3594                 elif st_groups[1] == "bridge":
3595                   val = instance.nics[nic_idx].bridge
3596                 else:
3597                   assert False, "Unhandled NIC parameter"
3598           else:
3599             assert False, "Unhandled variable parameter"
3600         else:
3601           raise errors.ParameterError(field)
3602         iout.append(val)
3603       output.append(iout)
3604
3605     return output
3606
3607
3608 class LUFailoverInstance(LogicalUnit):
3609   """Failover an instance.
3610
3611   """
3612   HPATH = "instance-failover"
3613   HTYPE = constants.HTYPE_INSTANCE
3614   _OP_REQP = ["instance_name", "ignore_consistency"]
3615   REQ_BGL = False
3616
3617   def ExpandNames(self):
3618     self._ExpandAndLockInstance()
3619     self.needed_locks[locking.LEVEL_NODE] = []
3620     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3621
3622   def DeclareLocks(self, level):
3623     if level == locking.LEVEL_NODE:
3624       self._LockInstancesNodes()
3625
3626   def BuildHooksEnv(self):
3627     """Build hooks env.
3628
3629     This runs on master, primary and secondary nodes of the instance.
3630
3631     """
3632     env = {
3633       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3634       }
3635     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3636     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3637     return env, nl, nl
3638
3639   def CheckPrereq(self):
3640     """Check prerequisites.
3641
3642     This checks that the instance is in the cluster.
3643
3644     """
3645     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3646     assert self.instance is not None, \
3647       "Cannot retrieve locked instance %s" % self.op.instance_name
3648
3649     bep = self.cfg.GetClusterInfo().FillBE(instance)
3650     if instance.disk_template not in constants.DTS_NET_MIRROR:
3651       raise errors.OpPrereqError("Instance's disk layout is not"
3652                                  " network mirrored, cannot failover.")
3653
3654     secondary_nodes = instance.secondary_nodes
3655     if not secondary_nodes:
3656       raise errors.ProgrammerError("no secondary node but using "
3657                                    "a mirrored disk template")
3658
3659     target_node = secondary_nodes[0]
3660     _CheckNodeOnline(self, target_node)
3661     _CheckNodeNotDrained(self, target_node)
3662     # check memory requirements on the secondary node
3663     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3664                          instance.name, bep[constants.BE_MEMORY],
3665                          instance.hypervisor)
3666     # check bridge existance
3667     _CheckInstanceBridgesExist(self, instance, node=target_node)
3668
3669   def Exec(self, feedback_fn):
3670     """Failover an instance.
3671
3672     The failover is done by shutting it down on its present node and
3673     starting it on the secondary.
3674
3675     """
3676     instance = self.instance
3677
3678     source_node = instance.primary_node
3679     target_node = instance.secondary_nodes[0]
3680
3681     feedback_fn("* checking disk consistency between source and target")
3682     for dev in instance.disks:
3683       # for drbd, these are drbd over lvm
3684       if not _CheckDiskConsistency(self, dev, target_node, False):
3685         if instance.admin_up and not self.op.ignore_consistency:
3686           raise errors.OpExecError("Disk %s is degraded on target node,"
3687                                    " aborting failover." % dev.iv_name)
3688
3689     feedback_fn("* shutting down instance on source node")
3690     logging.info("Shutting down instance %s on node %s",
3691                  instance.name, source_node)
3692
3693     result = self.rpc.call_instance_shutdown(source_node, instance)
3694     msg = result.RemoteFailMsg()
3695     if msg:
3696       if self.op.ignore_consistency:
3697         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3698                              " Proceeding anyway. Please make sure node"
3699                              " %s is down. Error details: %s",
3700                              instance.name, source_node, source_node, msg)
3701       else:
3702         raise errors.OpExecError("Could not shutdown instance %s on"
3703                                  " node %s: %s" %
3704                                  (instance.name, source_node, msg))
3705
3706     feedback_fn("* deactivating the instance's disks on source node")
3707     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3708       raise errors.OpExecError("Can't shut down the instance's disks.")
3709
3710     instance.primary_node = target_node
3711     # distribute new instance config to the other nodes
3712     self.cfg.Update(instance)
3713
3714     # Only start the instance if it's marked as up
3715     if instance.admin_up:
3716       feedback_fn("* activating the instance's disks on target node")
3717       logging.info("Starting instance %s on node %s",
3718                    instance.name, target_node)
3719
3720       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3721                                                ignore_secondaries=True)
3722       if not disks_ok:
3723         _ShutdownInstanceDisks(self, instance)
3724         raise errors.OpExecError("Can't activate the instance's disks")
3725
3726       feedback_fn("* starting the instance on the target node")
3727       result = self.rpc.call_instance_start(target_node, instance, None, None)
3728       msg = result.RemoteFailMsg()
3729       if msg:
3730         _ShutdownInstanceDisks(self, instance)
3731         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3732                                  (instance.name, target_node, msg))
3733
3734
3735 class LUMigrateInstance(LogicalUnit):
3736   """Migrate an instance.
3737
3738   This is migration without shutting down, compared to the failover,
3739   which is done with shutdown.
3740
3741   """
3742   HPATH = "instance-migrate"
3743   HTYPE = constants.HTYPE_INSTANCE
3744   _OP_REQP = ["instance_name", "live", "cleanup"]
3745
3746   REQ_BGL = False
3747
3748   def ExpandNames(self):
3749     self._ExpandAndLockInstance()
3750     self.needed_locks[locking.LEVEL_NODE] = []
3751     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3752
3753   def DeclareLocks(self, level):
3754     if level == locking.LEVEL_NODE:
3755       self._LockInstancesNodes()
3756
3757   def BuildHooksEnv(self):
3758     """Build hooks env.
3759
3760     This runs on master, primary and secondary nodes of the instance.
3761
3762     """
3763     env = _BuildInstanceHookEnvByObject(self, self.instance)
3764     env["MIGRATE_LIVE"] = self.op.live
3765     env["MIGRATE_CLEANUP"] = self.op.cleanup
3766     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3767     return env, nl, nl
3768
3769   def CheckPrereq(self):
3770     """Check prerequisites.
3771
3772     This checks that the instance is in the cluster.
3773
3774     """
3775     instance = self.cfg.GetInstanceInfo(
3776       self.cfg.ExpandInstanceName(self.op.instance_name))
3777     if instance is None:
3778       raise errors.OpPrereqError("Instance '%s' not known" %
3779                                  self.op.instance_name)
3780
3781     if instance.disk_template != constants.DT_DRBD8:
3782       raise errors.OpPrereqError("Instance's disk layout is not"
3783                                  " drbd8, cannot migrate.")
3784
3785     secondary_nodes = instance.secondary_nodes
3786     if not secondary_nodes:
3787       raise errors.ConfigurationError("No secondary node but using"
3788                                       " drbd8 disk template")
3789
3790     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3791
3792     target_node = secondary_nodes[0]
3793     # check memory requirements on the secondary node
3794     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3795                          instance.name, i_be[constants.BE_MEMORY],
3796                          instance.hypervisor)
3797
3798     # check bridge existance
3799     _CheckInstanceBridgesExist(self, instance, node=target_node)
3800
3801     if not self.op.cleanup:
3802       _CheckNodeNotDrained(self, target_node)
3803       result = self.rpc.call_instance_migratable(instance.primary_node,
3804                                                  instance)
3805       msg = result.RemoteFailMsg()
3806       if msg:
3807         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3808                                    msg)
3809
3810     self.instance = instance
3811
3812   def _WaitUntilSync(self):
3813     """Poll with custom rpc for disk sync.
3814
3815     This uses our own step-based rpc call.
3816
3817     """
3818     self.feedback_fn("* wait until resync is done")
3819     all_done = False
3820     while not all_done:
3821       all_done = True
3822       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3823                                             self.nodes_ip,
3824                                             self.instance.disks)
3825       min_percent = 100
3826       for node, nres in result.items():
3827         msg = nres.RemoteFailMsg()
3828         if msg:
3829           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3830                                    (node, msg))
3831         node_done, node_percent = nres.payload
3832         all_done = all_done and node_done
3833         if node_percent is not None:
3834           min_percent = min(min_percent, node_percent)
3835       if not all_done:
3836         if min_percent < 100:
3837           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3838         time.sleep(2)
3839
3840   def _EnsureSecondary(self, node):
3841     """Demote a node to secondary.
3842
3843     """
3844     self.feedback_fn("* switching node %s to secondary mode" % node)
3845
3846     for dev in self.instance.disks:
3847       self.cfg.SetDiskID(dev, node)
3848
3849     result = self.rpc.call_blockdev_close(node, self.instance.name,
3850                                           self.instance.disks)
3851     msg = result.RemoteFailMsg()
3852     if msg:
3853       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3854                                " error %s" % (node, msg))
3855
3856   def _GoStandalone(self):
3857     """Disconnect from the network.
3858
3859     """
3860     self.feedback_fn("* changing into standalone mode")
3861     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3862                                                self.instance.disks)
3863     for node, nres in result.items():
3864       msg = nres.RemoteFailMsg()
3865       if msg:
3866         raise errors.OpExecError("Cannot disconnect disks node %s,"
3867                                  " error %s" % (node, msg))
3868
3869   def _GoReconnect(self, multimaster):
3870     """Reconnect to the network.
3871
3872     """
3873     if multimaster:
3874       msg = "dual-master"
3875     else:
3876       msg = "single-master"
3877     self.feedback_fn("* changing disks into %s mode" % msg)
3878     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3879                                            self.instance.disks,
3880                                            self.instance.name, multimaster)
3881     for node, nres in result.items():
3882       msg = nres.RemoteFailMsg()
3883       if msg:
3884         raise errors.OpExecError("Cannot change disks config on node %s,"
3885                                  " error: %s" % (node, msg))
3886
3887   def _ExecCleanup(self):
3888     """Try to cleanup after a failed migration.
3889
3890     The cleanup is done by:
3891       - check that the instance is running only on one node
3892         (and update the config if needed)
3893       - change disks on its secondary node to secondary
3894       - wait until disks are fully synchronized
3895       - disconnect from the network
3896       - change disks into single-master mode
3897       - wait again until disks are fully synchronized
3898
3899     """
3900     instance = self.instance
3901     target_node = self.target_node
3902     source_node = self.source_node
3903
3904     # check running on only one node
3905     self.feedback_fn("* checking where the instance actually runs"
3906                      " (if this hangs, the hypervisor might be in"
3907                      " a bad state)")
3908     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3909     for node, result in ins_l.items():
3910       msg = result.RemoteFailMsg()
3911       if msg:
3912         raise errors.OpExecError("Can't contact node %s: %s" % (node, msg))
3913
3914     runningon_source = instance.name in ins_l[source_node].payload
3915     runningon_target = instance.name in ins_l[target_node].payload
3916
3917     if runningon_source and runningon_target:
3918       raise errors.OpExecError("Instance seems to be running on two nodes,"
3919                                " or the hypervisor is confused. You will have"
3920                                " to ensure manually that it runs only on one"
3921                                " and restart this operation.")
3922
3923     if not (runningon_source or runningon_target):
3924       raise errors.OpExecError("Instance does not seem to be running at all."
3925                                " In this case, it's safer to repair by"
3926                                " running 'gnt-instance stop' to ensure disk"
3927                                " shutdown, and then restarting it.")
3928
3929     if runningon_target:
3930       # the migration has actually succeeded, we need to update the config
3931       self.feedback_fn("* instance running on secondary node (%s),"
3932                        " updating config" % target_node)
3933       instance.primary_node = target_node
3934       self.cfg.Update(instance)
3935       demoted_node = source_node
3936     else:
3937       self.feedback_fn("* instance confirmed to be running on its"
3938                        " primary node (%s)" % source_node)
3939       demoted_node = target_node
3940
3941     self._EnsureSecondary(demoted_node)
3942     try:
3943       self._WaitUntilSync()
3944     except errors.OpExecError:
3945       # we ignore here errors, since if the device is standalone, it
3946       # won't be able to sync
3947       pass
3948     self._GoStandalone()
3949     self._GoReconnect(False)
3950     self._WaitUntilSync()
3951
3952     self.feedback_fn("* done")
3953
3954   def _RevertDiskStatus(self):
3955     """Try to revert the disk status after a failed migration.
3956
3957     """
3958     target_node = self.target_node
3959     try:
3960       self._EnsureSecondary(target_node)
3961       self._GoStandalone()
3962       self._GoReconnect(False)
3963       self._WaitUntilSync()
3964     except errors.OpExecError, err:
3965       self.LogWarning("Migration failed and I can't reconnect the"
3966                       " drives: error '%s'\n"
3967                       "Please look and recover the instance status" %
3968                       str(err))
3969
3970   def _AbortMigration(self):
3971     """Call the hypervisor code to abort a started migration.
3972
3973     """
3974     instance = self.instance
3975     target_node = self.target_node
3976     migration_info = self.migration_info
3977
3978     abort_result = self.rpc.call_finalize_migration(target_node,
3979                                                     instance,
3980                                                     migration_info,
3981                                                     False)
3982     abort_msg = abort_result.RemoteFailMsg()
3983     if abort_msg:
3984       logging.error("Aborting migration failed on target node %s: %s" %
3985                     (target_node, abort_msg))
3986       # Don't raise an exception here, as we stil have to try to revert the
3987       # disk status, even if this step failed.
3988
3989   def _ExecMigration(self):
3990     """Migrate an instance.
3991
3992     The migrate is done by:
3993       - change the disks into dual-master mode
3994       - wait until disks are fully synchronized again
3995       - migrate the instance
3996       - change disks on the new secondary node (the old primary) to secondary
3997       - wait until disks are fully synchronized
3998       - change disks into single-master mode
3999
4000     """
4001     instance = self.instance
4002     target_node = self.target_node
4003     source_node = self.source_node
4004
4005     self.feedback_fn("* checking disk consistency between source and target")
4006     for dev in instance.disks:
4007       if not _CheckDiskConsistency(self, dev, target_node, False):
4008         raise errors.OpExecError("Disk %s is degraded or not fully"
4009                                  " synchronized on target node,"
4010                                  " aborting migrate." % dev.iv_name)
4011
4012     # First get the migration information from the remote node
4013     result = self.rpc.call_migration_info(source_node, instance)
4014     msg = result.RemoteFailMsg()
4015     if msg:
4016       log_err = ("Failed fetching source migration information from %s: %s" %
4017                  (source_node, msg))
4018       logging.error(log_err)
4019       raise errors.OpExecError(log_err)
4020
4021     self.migration_info = migration_info = result.payload
4022
4023     # Then switch the disks to master/master mode
4024     self._EnsureSecondary(target_node)
4025     self._GoStandalone()
4026     self._GoReconnect(True)
4027     self._WaitUntilSync()
4028
4029     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4030     result = self.rpc.call_accept_instance(target_node,
4031                                            instance,
4032                                            migration_info,
4033                                            self.nodes_ip[target_node])
4034
4035     msg = result.RemoteFailMsg()
4036     if msg:
4037       logging.error("Instance pre-migration failed, trying to revert"
4038                     " disk status: %s", msg)
4039       self._AbortMigration()
4040       self._RevertDiskStatus()
4041       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4042                                (instance.name, msg))
4043
4044     self.feedback_fn("* migrating instance to %s" % target_node)
4045     time.sleep(10)
4046     result = self.rpc.call_instance_migrate(source_node, instance,
4047                                             self.nodes_ip[target_node],
4048                                             self.op.live)
4049     msg = result.RemoteFailMsg()
4050     if msg:
4051       logging.error("Instance migration failed, trying to revert"
4052                     " disk status: %s", msg)
4053       self._AbortMigration()
4054       self._RevertDiskStatus()
4055       raise errors.OpExecError("Could not migrate instance %s: %s" %
4056                                (instance.name, msg))
4057     time.sleep(10)
4058
4059     instance.primary_node = target_node
4060     # distribute new instance config to the other nodes
4061     self.cfg.Update(instance)
4062
4063     result = self.rpc.call_finalize_migration(target_node,
4064                                               instance,
4065                                               migration_info,
4066                                               True)
4067     msg = result.RemoteFailMsg()
4068     if msg:
4069       logging.error("Instance migration succeeded, but finalization failed:"
4070                     " %s" % msg)
4071       raise errors.OpExecError("Could not finalize instance migration: %s" %
4072                                msg)
4073
4074     self._EnsureSecondary(source_node)
4075     self._WaitUntilSync()
4076     self._GoStandalone()
4077     self._GoReconnect(False)
4078     self._WaitUntilSync()
4079
4080     self.feedback_fn("* done")
4081
4082   def Exec(self, feedback_fn):
4083     """Perform the migration.
4084
4085     """
4086     self.feedback_fn = feedback_fn
4087
4088     self.source_node = self.instance.primary_node
4089     self.target_node = self.instance.secondary_nodes[0]
4090     self.all_nodes = [self.source_node, self.target_node]
4091     self.nodes_ip = {
4092       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4093       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4094       }
4095     if self.op.cleanup:
4096       return self._ExecCleanup()
4097     else:
4098       return self._ExecMigration()
4099
4100
4101 def _CreateBlockDev(lu, node, instance, device, force_create,
4102                     info, force_open):
4103   """Create a tree of block devices on a given node.
4104
4105   If this device type has to be created on secondaries, create it and
4106   all its children.
4107
4108   If not, just recurse to children keeping the same 'force' value.
4109
4110   @param lu: the lu on whose behalf we execute
4111   @param node: the node on which to create the device
4112   @type instance: L{objects.Instance}
4113   @param instance: the instance which owns the device
4114   @type device: L{objects.Disk}
4115   @param device: the device to create
4116   @type force_create: boolean
4117   @param force_create: whether to force creation of this device; this
4118       will be change to True whenever we find a device which has
4119       CreateOnSecondary() attribute
4120   @param info: the extra 'metadata' we should attach to the device
4121       (this will be represented as a LVM tag)
4122   @type force_open: boolean
4123   @param force_open: this parameter will be passes to the
4124       L{backend.BlockdevCreate} function where it specifies
4125       whether we run on primary or not, and it affects both
4126       the child assembly and the device own Open() execution
4127
4128   """
4129   if device.CreateOnSecondary():
4130     force_create = True
4131
4132   if device.children:
4133     for child in device.children:
4134       _CreateBlockDev(lu, node, instance, child, force_create,
4135                       info, force_open)
4136
4137   if not force_create:
4138     return
4139
4140   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4141
4142
4143 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4144   """Create a single block device on a given node.
4145
4146   This will not recurse over children of the device, so they must be
4147   created in advance.
4148
4149   @param lu: the lu on whose behalf we execute
4150   @param node: the node on which to create the device
4151   @type instance: L{objects.Instance}
4152   @param instance: the instance which owns the device
4153   @type device: L{objects.Disk}
4154   @param device: the device to create
4155   @param info: the extra 'metadata' we should attach to the device
4156       (this will be represented as a LVM tag)
4157   @type force_open: boolean
4158   @param force_open: this parameter will be passes to the
4159       L{backend.BlockdevCreate} function where it specifies
4160       whether we run on primary or not, and it affects both
4161       the child assembly and the device own Open() execution
4162
4163   """
4164   lu.cfg.SetDiskID(device, node)
4165   result = lu.rpc.call_blockdev_create(node, device, device.size,
4166                                        instance.name, force_open, info)
4167   msg = result.RemoteFailMsg()
4168   if msg:
4169     raise errors.OpExecError("Can't create block device %s on"
4170                              " node %s for instance %s: %s" %
4171                              (device, node, instance.name, msg))
4172   if device.physical_id is None:
4173     device.physical_id = result.payload
4174
4175
4176 def _GenerateUniqueNames(lu, exts):
4177   """Generate a suitable LV name.
4178
4179   This will generate a logical volume name for the given instance.
4180
4181   """
4182   results = []
4183   for val in exts:
4184     new_id = lu.cfg.GenerateUniqueID()
4185     results.append("%s%s" % (new_id, val))
4186   return results
4187
4188
4189 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4190                          p_minor, s_minor):
4191   """Generate a drbd8 device complete with its children.
4192
4193   """
4194   port = lu.cfg.AllocatePort()
4195   vgname = lu.cfg.GetVGName()
4196   shared_secret = lu.cfg.GenerateDRBDSecret()
4197   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4198                           logical_id=(vgname, names[0]))
4199   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4200                           logical_id=(vgname, names[1]))
4201   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4202                           logical_id=(primary, secondary, port,
4203                                       p_minor, s_minor,
4204                                       shared_secret),
4205                           children=[dev_data, dev_meta],
4206                           iv_name=iv_name)
4207   return drbd_dev
4208
4209
4210 def _GenerateDiskTemplate(lu, template_name,
4211                           instance_name, primary_node,
4212                           secondary_nodes, disk_info,
4213                           file_storage_dir, file_driver,
4214                           base_index):
4215   """Generate the entire disk layout for a given template type.
4216
4217   """
4218   #TODO: compute space requirements
4219
4220   vgname = lu.cfg.GetVGName()
4221   disk_count = len(disk_info)
4222   disks = []
4223   if template_name == constants.DT_DISKLESS:
4224     pass
4225   elif template_name == constants.DT_PLAIN:
4226     if len(secondary_nodes) != 0:
4227       raise errors.ProgrammerError("Wrong template configuration")
4228
4229     names = _GenerateUniqueNames(lu, [".disk%d" % i
4230                                       for i in range(disk_count)])
4231     for idx, disk in enumerate(disk_info):
4232       disk_index = idx + base_index
4233       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4234                               logical_id=(vgname, names[idx]),
4235                               iv_name="disk/%d" % disk_index,
4236                               mode=disk["mode"])
4237       disks.append(disk_dev)
4238   elif template_name == constants.DT_DRBD8:
4239     if len(secondary_nodes) != 1:
4240       raise errors.ProgrammerError("Wrong template configuration")
4241     remote_node = secondary_nodes[0]
4242     minors = lu.cfg.AllocateDRBDMinor(
4243       [primary_node, remote_node] * len(disk_info), instance_name)
4244
4245     names = []
4246     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4247                                                for i in range(disk_count)]):
4248       names.append(lv_prefix + "_data")
4249       names.append(lv_prefix + "_meta")
4250     for idx, disk in enumerate(disk_info):
4251       disk_index = idx + base_index
4252       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4253                                       disk["size"], names[idx*2:idx*2+2],
4254                                       "disk/%d" % disk_index,
4255                                       minors[idx*2], minors[idx*2+1])
4256       disk_dev.mode = disk["mode"]
4257       disks.append(disk_dev)
4258   elif template_name == constants.DT_FILE:
4259     if len(secondary_nodes) != 0:
4260       raise errors.ProgrammerError("Wrong template configuration")
4261
4262     for idx, disk in enumerate(disk_info):
4263       disk_index = idx + base_index
4264       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4265                               iv_name="disk/%d" % disk_index,
4266                               logical_id=(file_driver,
4267                                           "%s/disk%d" % (file_storage_dir,
4268                                                          disk_index)),
4269                               mode=disk["mode"])
4270       disks.append(disk_dev)
4271   else:
4272     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4273   return disks
4274
4275
4276 def _GetInstanceInfoText(instance):
4277   """Compute that text that should be added to the disk's metadata.
4278
4279   """
4280   return "originstname+%s" % instance.name
4281
4282
4283 def _CreateDisks(lu, instance):
4284   """Create all disks for an instance.
4285
4286   This abstracts away some work from AddInstance.
4287
4288   @type lu: L{LogicalUnit}
4289   @param lu: the logical unit on whose behalf we execute
4290   @type instance: L{objects.Instance}
4291   @param instance: the instance whose disks we should create
4292   @rtype: boolean
4293   @return: the success of the creation
4294
4295   """
4296   info = _GetInstanceInfoText(instance)
4297   pnode = instance.primary_node
4298
4299   if instance.disk_template == constants.DT_FILE:
4300     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4301     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4302
4303     msg = result.RemoteFailMsg()
4304
4305     if msg:
4306       raise errors.OpExecError("Failed to create directory '%s' on"
4307                                " node %s: %s" % (file_storage_dir, msg))
4308
4309   # Note: this needs to be kept in sync with adding of disks in
4310   # LUSetInstanceParams
4311   for device in instance.disks:
4312     logging.info("Creating volume %s for instance %s",
4313                  device.iv_name, instance.name)
4314     #HARDCODE
4315     for node in instance.all_nodes:
4316       f_create = node == pnode
4317       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4318
4319
4320 def _RemoveDisks(lu, instance):
4321   """Remove all disks for an instance.
4322
4323   This abstracts away some work from `AddInstance()` and
4324   `RemoveInstance()`. Note that in case some of the devices couldn't
4325   be removed, the removal will continue with the other ones (compare
4326   with `_CreateDisks()`).
4327
4328   @type lu: L{LogicalUnit}
4329   @param lu: the logical unit on whose behalf we execute
4330   @type instance: L{objects.Instance}
4331   @param instance: the instance whose disks we should remove
4332   @rtype: boolean
4333   @return: the success of the removal
4334
4335   """
4336   logging.info("Removing block devices for instance %s", instance.name)
4337
4338   all_result = True
4339   for device in instance.disks:
4340     for node, disk in device.ComputeNodeTree(instance.primary_node):
4341       lu.cfg.SetDiskID(disk, node)
4342       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4343       if msg:
4344         lu.LogWarning("Could not remove block device %s on node %s,"
4345                       " continuing anyway: %s", device.iv_name, node, msg)
4346         all_result = False
4347
4348   if instance.disk_template == constants.DT_FILE:
4349     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4350     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4351                                                  file_storage_dir)
4352     msg = result.RemoteFailMsg()
4353     if msg:
4354       lu.LogWarning("Could not remove directory '%s' on node %s: %s",
4355                     file_storage_dir, instance.primary_node, msg)
4356       all_result = False
4357
4358   return all_result
4359
4360
4361 def _ComputeDiskSize(disk_template, disks):
4362   """Compute disk size requirements in the volume group
4363
4364   """
4365   # Required free disk space as a function of disk and swap space
4366   req_size_dict = {
4367     constants.DT_DISKLESS: None,
4368     constants.DT_PLAIN: sum(d["size"] for d in disks),
4369     # 128 MB are added for drbd metadata for each disk
4370     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4371     constants.DT_FILE: None,
4372   }
4373
4374   if disk_template not in req_size_dict:
4375     raise errors.ProgrammerError("Disk template '%s' size requirement"
4376                                  " is unknown" %  disk_template)
4377
4378   return req_size_dict[disk_template]
4379
4380
4381 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4382   """Hypervisor parameter validation.
4383
4384   This function abstract the hypervisor parameter validation to be
4385   used in both instance create and instance modify.
4386
4387   @type lu: L{LogicalUnit}
4388   @param lu: the logical unit for which we check
4389   @type nodenames: list
4390   @param nodenames: the list of nodes on which we should check
4391   @type hvname: string
4392   @param hvname: the name of the hypervisor we should use
4393   @type hvparams: dict
4394   @param hvparams: the parameters which we need to check
4395   @raise errors.OpPrereqError: if the parameters are not valid
4396
4397   """
4398   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4399                                                   hvname,
4400                                                   hvparams)
4401   for node in nodenames:
4402     info = hvinfo[node]
4403     if info.offline:
4404       continue
4405     msg = info.RemoteFailMsg()
4406     if msg:
4407       raise errors.OpPrereqError("Hypervisor parameter validation"
4408                                  " failed on node %s: %s" % (node, msg))
4409
4410
4411 class LUCreateInstance(LogicalUnit):
4412   """Create an instance.
4413
4414   """
4415   HPATH = "instance-add"
4416   HTYPE = constants.HTYPE_INSTANCE
4417   _OP_REQP = ["instance_name", "disks", "disk_template",
4418               "mode", "start",
4419               "wait_for_sync", "ip_check", "nics",
4420               "hvparams", "beparams"]
4421   REQ_BGL = False
4422
4423   def _ExpandNode(self, node):
4424     """Expands and checks one node name.
4425
4426     """
4427     node_full = self.cfg.ExpandNodeName(node)
4428     if node_full is None:
4429       raise errors.OpPrereqError("Unknown node %s" % node)
4430     return node_full
4431
4432   def ExpandNames(self):
4433     """ExpandNames for CreateInstance.
4434
4435     Figure out the right locks for instance creation.
4436
4437     """
4438     self.needed_locks = {}
4439
4440     # set optional parameters to none if they don't exist
4441     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4442       if not hasattr(self.op, attr):
4443         setattr(self.op, attr, None)
4444
4445     # cheap checks, mostly valid constants given
4446
4447     # verify creation mode
4448     if self.op.mode not in (constants.INSTANCE_CREATE,
4449                             constants.INSTANCE_IMPORT):
4450       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4451                                  self.op.mode)
4452
4453     # disk template and mirror node verification
4454     if self.op.disk_template not in constants.DISK_TEMPLATES:
4455       raise errors.OpPrereqError("Invalid disk template name")
4456
4457     if self.op.hypervisor is None:
4458       self.op.hypervisor = self.cfg.GetHypervisorType()
4459
4460     cluster = self.cfg.GetClusterInfo()
4461     enabled_hvs = cluster.enabled_hypervisors
4462     if self.op.hypervisor not in enabled_hvs:
4463       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4464                                  " cluster (%s)" % (self.op.hypervisor,
4465                                   ",".join(enabled_hvs)))
4466
4467     # check hypervisor parameter syntax (locally)
4468     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4469     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4470                                   self.op.hvparams)
4471     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4472     hv_type.CheckParameterSyntax(filled_hvp)
4473
4474     # fill and remember the beparams dict
4475     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4476     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4477                                     self.op.beparams)
4478
4479     #### instance parameters check
4480
4481     # instance name verification
4482     hostname1 = utils.HostInfo(self.op.instance_name)
4483     self.op.instance_name = instance_name = hostname1.name
4484
4485     # this is just a preventive check, but someone might still add this
4486     # instance in the meantime, and creation will fail at lock-add time
4487     if instance_name in self.cfg.GetInstanceList():
4488       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4489                                  instance_name)
4490
4491     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4492
4493     # NIC buildup
4494     self.nics = []
4495     for idx, nic in enumerate(self.op.nics):
4496       nic_mode_req = nic.get("mode", None)
4497       nic_mode = nic_mode_req
4498       if nic_mode is None:
4499         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4500
4501       # in routed mode, for the first nic, the default ip is 'auto'
4502       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4503         default_ip_mode = constants.VALUE_AUTO
4504       else:
4505         default_ip_mode = constants.VALUE_NONE
4506
4507       # ip validity checks
4508       ip = nic.get("ip", default_ip_mode)
4509       if ip is None or ip.lower() == constants.VALUE_NONE:
4510         nic_ip = None
4511       elif ip.lower() == constants.VALUE_AUTO:
4512         nic_ip = hostname1.ip
4513       else:
4514         if not utils.IsValidIP(ip):
4515           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4516                                      " like a valid IP" % ip)
4517         nic_ip = ip
4518
4519       # TODO: check the ip for uniqueness !!
4520       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4521         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4522
4523       # MAC address verification
4524       mac = nic.get("mac", constants.VALUE_AUTO)
4525       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4526         if not utils.IsValidMac(mac.lower()):
4527           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4528                                      mac)
4529       # bridge verification
4530       bridge = nic.get("bridge", None)
4531       link = nic.get("link", None)
4532       if bridge and link:
4533         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4534       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4535         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4536       elif bridge:
4537         link = bridge
4538
4539       nicparams = {}
4540       if nic_mode_req:
4541         nicparams[constants.NIC_MODE] = nic_mode_req
4542       if link:
4543         nicparams[constants.NIC_LINK] = link
4544
4545       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4546                                       nicparams)
4547       objects.NIC.CheckParameterSyntax(check_params)
4548       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4549
4550     # disk checks/pre-build
4551     self.disks = []
4552     for disk in self.op.disks:
4553       mode = disk.get("mode", constants.DISK_RDWR)
4554       if mode not in constants.DISK_ACCESS_SET:
4555         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4556                                    mode)
4557       size = disk.get("size", None)
4558       if size is None:
4559         raise errors.OpPrereqError("Missing disk size")
4560       try:
4561         size = int(size)
4562       except ValueError:
4563         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4564       self.disks.append({"size": size, "mode": mode})
4565
4566     # used in CheckPrereq for ip ping check
4567     self.check_ip = hostname1.ip
4568
4569     # file storage checks
4570     if (self.op.file_driver and
4571         not self.op.file_driver in constants.FILE_DRIVER):
4572       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4573                                  self.op.file_driver)
4574
4575     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4576       raise errors.OpPrereqError("File storage directory path not absolute")
4577
4578     ### Node/iallocator related checks
4579     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4580       raise errors.OpPrereqError("One and only one of iallocator and primary"
4581                                  " node must be given")
4582
4583     if self.op.iallocator:
4584       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4585     else:
4586       self.op.pnode = self._ExpandNode(self.op.pnode)
4587       nodelist = [self.op.pnode]
4588       if self.op.snode is not None:
4589         self.op.snode = self._ExpandNode(self.op.snode)
4590         nodelist.append(self.op.snode)
4591       self.needed_locks[locking.LEVEL_NODE] = nodelist
4592
4593     # in case of import lock the source node too
4594     if self.op.mode == constants.INSTANCE_IMPORT:
4595       src_node = getattr(self.op, "src_node", None)
4596       src_path = getattr(self.op, "src_path", None)
4597
4598       if src_path is None:
4599         self.op.src_path = src_path = self.op.instance_name
4600
4601       if src_node is None:
4602         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4603         self.op.src_node = None
4604         if os.path.isabs(src_path):
4605           raise errors.OpPrereqError("Importing an instance from an absolute"
4606                                      " path requires a source node option.")
4607       else:
4608         self.op.src_node = src_node = self._ExpandNode(src_node)
4609         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4610           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4611         if not os.path.isabs(src_path):
4612           self.op.src_path = src_path = \
4613             os.path.join(constants.EXPORT_DIR, src_path)
4614
4615     else: # INSTANCE_CREATE
4616       if getattr(self.op, "os_type", None) is None:
4617         raise errors.OpPrereqError("No guest OS specified")
4618
4619   def _RunAllocator(self):
4620     """Run the allocator based on input opcode.
4621
4622     """
4623     nics = [n.ToDict() for n in self.nics]
4624     ial = IAllocator(self,
4625                      mode=constants.IALLOCATOR_MODE_ALLOC,
4626                      name=self.op.instance_name,
4627                      disk_template=self.op.disk_template,
4628                      tags=[],
4629                      os=self.op.os_type,
4630                      vcpus=self.be_full[constants.BE_VCPUS],
4631                      mem_size=self.be_full[constants.BE_MEMORY],
4632                      disks=self.disks,
4633                      nics=nics,
4634                      hypervisor=self.op.hypervisor,
4635                      )
4636
4637     ial.Run(self.op.iallocator)
4638
4639     if not ial.success:
4640       raise errors.OpPrereqError("Can't compute nodes using"
4641                                  " iallocator '%s': %s" % (self.op.iallocator,
4642                                                            ial.info))
4643     if len(ial.nodes) != ial.required_nodes:
4644       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4645                                  " of nodes (%s), required %s" %
4646                                  (self.op.iallocator, len(ial.nodes),
4647                                   ial.required_nodes))
4648     self.op.pnode = ial.nodes[0]
4649     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4650                  self.op.instance_name, self.op.iallocator,
4651                  ", ".join(ial.nodes))
4652     if ial.required_nodes == 2:
4653       self.op.snode = ial.nodes[1]
4654
4655   def BuildHooksEnv(self):
4656     """Build hooks env.
4657
4658     This runs on master, primary and secondary nodes of the instance.
4659
4660     """
4661     env = {
4662       "ADD_MODE": self.op.mode,
4663       }
4664     if self.op.mode == constants.INSTANCE_IMPORT:
4665       env["SRC_NODE"] = self.op.src_node
4666       env["SRC_PATH"] = self.op.src_path
4667       env["SRC_IMAGES"] = self.src_images
4668
4669     env.update(_BuildInstanceHookEnv(
4670       name=self.op.instance_name,
4671       primary_node=self.op.pnode,
4672       secondary_nodes=self.secondaries,
4673       status=self.op.start,
4674       os_type=self.op.os_type,
4675       memory=self.be_full[constants.BE_MEMORY],
4676       vcpus=self.be_full[constants.BE_VCPUS],
4677       nics=_PreBuildNICHooksList(self, self.nics),
4678       disk_template=self.op.disk_template,
4679       disks=[(d["size"], d["mode"]) for d in self.disks],
4680     ))
4681
4682     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4683           self.secondaries)
4684     return env, nl, nl
4685
4686
4687   def CheckPrereq(self):
4688     """Check prerequisites.
4689
4690     """
4691     if (not self.cfg.GetVGName() and
4692         self.op.disk_template not in constants.DTS_NOT_LVM):
4693       raise errors.OpPrereqError("Cluster does not support lvm-based"
4694                                  " instances")
4695
4696     if self.op.mode == constants.INSTANCE_IMPORT:
4697       src_node = self.op.src_node
4698       src_path = self.op.src_path
4699
4700       if src_node is None:
4701         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4702         exp_list = self.rpc.call_export_list(locked_nodes)
4703         found = False
4704         for node in exp_list:
4705           if exp_list[node].RemoteFailMsg():
4706             continue
4707           if src_path in exp_list[node].payload:
4708             found = True
4709             self.op.src_node = src_node = node
4710             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4711                                                        src_path)
4712             break
4713         if not found:
4714           raise errors.OpPrereqError("No export found for relative path %s" %
4715                                       src_path)
4716
4717       _CheckNodeOnline(self, src_node)
4718       result = self.rpc.call_export_info(src_node, src_path)
4719       msg = result.RemoteFailMsg()
4720       if msg:
4721         raise errors.OpPrereqError("No export or invalid export found in"
4722                                    " dir %s: %s" % (src_path, msg))
4723
4724       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4725       if not export_info.has_section(constants.INISECT_EXP):
4726         raise errors.ProgrammerError("Corrupted export config")
4727
4728       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4729       if (int(ei_version) != constants.EXPORT_VERSION):
4730         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4731                                    (ei_version, constants.EXPORT_VERSION))
4732
4733       # Check that the new instance doesn't have less disks than the export
4734       instance_disks = len(self.disks)
4735       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4736       if instance_disks < export_disks:
4737         raise errors.OpPrereqError("Not enough disks to import."
4738                                    " (instance: %d, export: %d)" %
4739                                    (instance_disks, export_disks))
4740
4741       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4742       disk_images = []
4743       for idx in range(export_disks):
4744         option = 'disk%d_dump' % idx
4745         if export_info.has_option(constants.INISECT_INS, option):
4746           # FIXME: are the old os-es, disk sizes, etc. useful?
4747           export_name = export_info.get(constants.INISECT_INS, option)
4748           image = os.path.join(src_path, export_name)
4749           disk_images.append(image)
4750         else:
4751           disk_images.append(False)
4752
4753       self.src_images = disk_images
4754
4755       old_name = export_info.get(constants.INISECT_INS, 'name')
4756       # FIXME: int() here could throw a ValueError on broken exports
4757       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4758       if self.op.instance_name == old_name:
4759         for idx, nic in enumerate(self.nics):
4760           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4761             nic_mac_ini = 'nic%d_mac' % idx
4762             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4763
4764     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4765     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4766     if self.op.start and not self.op.ip_check:
4767       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4768                                  " adding an instance in start mode")
4769
4770     if self.op.ip_check:
4771       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4772         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4773                                    (self.check_ip, self.op.instance_name))
4774
4775     #### mac address generation
4776     # By generating here the mac address both the allocator and the hooks get
4777     # the real final mac address rather than the 'auto' or 'generate' value.
4778     # There is a race condition between the generation and the instance object
4779     # creation, which means that we know the mac is valid now, but we're not
4780     # sure it will be when we actually add the instance. If things go bad
4781     # adding the instance will abort because of a duplicate mac, and the
4782     # creation job will fail.
4783     for nic in self.nics:
4784       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4785         nic.mac = self.cfg.GenerateMAC()
4786
4787     #### allocator run
4788
4789     if self.op.iallocator is not None:
4790       self._RunAllocator()
4791
4792     #### node related checks
4793
4794     # check primary node
4795     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4796     assert self.pnode is not None, \
4797       "Cannot retrieve locked node %s" % self.op.pnode
4798     if pnode.offline:
4799       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4800                                  pnode.name)
4801     if pnode.drained:
4802       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4803                                  pnode.name)
4804
4805     self.secondaries = []
4806
4807     # mirror node verification
4808     if self.op.disk_template in constants.DTS_NET_MIRROR:
4809       if self.op.snode is None:
4810         raise errors.OpPrereqError("The networked disk templates need"
4811                                    " a mirror node")
4812       if self.op.snode == pnode.name:
4813         raise errors.OpPrereqError("The secondary node cannot be"
4814                                    " the primary node.")
4815       _CheckNodeOnline(self, self.op.snode)
4816       _CheckNodeNotDrained(self, self.op.snode)
4817       self.secondaries.append(self.op.snode)
4818
4819     nodenames = [pnode.name] + self.secondaries
4820
4821     req_size = _ComputeDiskSize(self.op.disk_template,
4822                                 self.disks)
4823
4824     # Check lv size requirements
4825     if req_size is not None:
4826       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4827                                          self.op.hypervisor)
4828       for node in nodenames:
4829         info = nodeinfo[node]
4830         msg = info.RemoteFailMsg()
4831         if msg:
4832           raise errors.OpPrereqError("Cannot get current information"
4833                                      " from node %s: %s" % (node, msg))
4834         info = info.payload
4835         vg_free = info.get('vg_free', None)
4836         if not isinstance(vg_free, int):
4837           raise errors.OpPrereqError("Can't compute free disk space on"
4838                                      " node %s" % node)
4839         if req_size > vg_free:
4840           raise errors.OpPrereqError("Not enough disk space on target node %s."
4841                                      " %d MB available, %d MB required" %
4842                                      (node, vg_free, req_size))
4843
4844     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4845
4846     # os verification
4847     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4848     msg = result.RemoteFailMsg()
4849     if msg:
4850       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4851                                  " primary node %s: %s"  %
4852                                  (self.op.os_type, pnode.name, msg))
4853
4854     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4855
4856     # memory check on primary node
4857     if self.op.start:
4858       _CheckNodeFreeMemory(self, self.pnode.name,
4859                            "creating instance %s" % self.op.instance_name,
4860                            self.be_full[constants.BE_MEMORY],
4861                            self.op.hypervisor)
4862
4863   def Exec(self, feedback_fn):
4864     """Create and add the instance to the cluster.
4865
4866     """
4867     instance = self.op.instance_name
4868     pnode_name = self.pnode.name
4869
4870     ht_kind = self.op.hypervisor
4871     if ht_kind in constants.HTS_REQ_PORT:
4872       network_port = self.cfg.AllocatePort()
4873     else:
4874       network_port = None
4875
4876     ##if self.op.vnc_bind_address is None:
4877     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4878
4879     # this is needed because os.path.join does not accept None arguments
4880     if self.op.file_storage_dir is None:
4881       string_file_storage_dir = ""
4882     else:
4883       string_file_storage_dir = self.op.file_storage_dir
4884
4885     # build the full file storage dir path
4886     file_storage_dir = os.path.normpath(os.path.join(
4887                                         self.cfg.GetFileStorageDir(),
4888                                         string_file_storage_dir, instance))
4889
4890
4891     disks = _GenerateDiskTemplate(self,
4892                                   self.op.disk_template,
4893                                   instance, pnode_name,
4894                                   self.secondaries,
4895                                   self.disks,
4896                                   file_storage_dir,
4897                                   self.op.file_driver,
4898                                   0)
4899
4900     iobj = objects.Instance(name=instance, os=self.op.os_type,
4901                             primary_node=pnode_name,
4902                             nics=self.nics, disks=disks,
4903                             disk_template=self.op.disk_template,
4904                             admin_up=False,
4905                             network_port=network_port,
4906                             beparams=self.op.beparams,
4907                             hvparams=self.op.hvparams,
4908                             hypervisor=self.op.hypervisor,
4909                             )
4910
4911     feedback_fn("* creating instance disks...")
4912     try:
4913       _CreateDisks(self, iobj)
4914     except errors.OpExecError:
4915       self.LogWarning("Device creation failed, reverting...")
4916       try:
4917         _RemoveDisks(self, iobj)
4918       finally:
4919         self.cfg.ReleaseDRBDMinors(instance)
4920         raise
4921
4922     feedback_fn("adding instance %s to cluster config" % instance)
4923
4924     self.cfg.AddInstance(iobj)
4925     # Declare that we don't want to remove the instance lock anymore, as we've
4926     # added the instance to the config
4927     del self.remove_locks[locking.LEVEL_INSTANCE]
4928     # Unlock all the nodes
4929     if self.op.mode == constants.INSTANCE_IMPORT:
4930       nodes_keep = [self.op.src_node]
4931       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4932                        if node != self.op.src_node]
4933       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4934       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4935     else:
4936       self.context.glm.release(locking.LEVEL_NODE)
4937       del self.acquired_locks[locking.LEVEL_NODE]
4938
4939     if self.op.wait_for_sync:
4940       disk_abort = not _WaitForSync(self, iobj)
4941     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4942       # make sure the disks are not degraded (still sync-ing is ok)
4943       time.sleep(15)
4944       feedback_fn("* checking mirrors status")
4945       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4946     else:
4947       disk_abort = False
4948
4949     if disk_abort:
4950       _RemoveDisks(self, iobj)
4951       self.cfg.RemoveInstance(iobj.name)
4952       # Make sure the instance lock gets removed
4953       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4954       raise errors.OpExecError("There are some degraded disks for"
4955                                " this instance")
4956
4957     feedback_fn("creating os for instance %s on node %s" %
4958                 (instance, pnode_name))
4959
4960     if iobj.disk_template != constants.DT_DISKLESS:
4961       if self.op.mode == constants.INSTANCE_CREATE:
4962         feedback_fn("* running the instance OS create scripts...")
4963         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4964         msg = result.RemoteFailMsg()
4965         if msg:
4966           raise errors.OpExecError("Could not add os for instance %s"
4967                                    " on node %s: %s" %
4968                                    (instance, pnode_name, msg))
4969
4970       elif self.op.mode == constants.INSTANCE_IMPORT:
4971         feedback_fn("* running the instance OS import scripts...")
4972         src_node = self.op.src_node
4973         src_images = self.src_images
4974         cluster_name = self.cfg.GetClusterName()
4975         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4976                                                          src_node, src_images,
4977                                                          cluster_name)
4978         msg = import_result.RemoteFailMsg()
4979         if msg:
4980           self.LogWarning("Error while importing the disk images for instance"
4981                           " %s on node %s: %s" % (instance, pnode_name, msg))
4982       else:
4983         # also checked in the prereq part
4984         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4985                                      % self.op.mode)
4986
4987     if self.op.start:
4988       iobj.admin_up = True
4989       self.cfg.Update(iobj)
4990       logging.info("Starting instance %s on node %s", instance, pnode_name)
4991       feedback_fn("* starting instance...")
4992       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4993       msg = result.RemoteFailMsg()
4994       if msg:
4995         raise errors.OpExecError("Could not start instance: %s" % msg)
4996
4997
4998 class LUConnectConsole(NoHooksLU):
4999   """Connect to an instance's console.
5000
5001   This is somewhat special in that it returns the command line that
5002   you need to run on the master node in order to connect to the
5003   console.
5004
5005   """
5006   _OP_REQP = ["instance_name"]
5007   REQ_BGL = False
5008
5009   def ExpandNames(self):
5010     self._ExpandAndLockInstance()
5011
5012   def CheckPrereq(self):
5013     """Check prerequisites.
5014
5015     This checks that the instance is in the cluster.
5016
5017     """
5018     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5019     assert self.instance is not None, \
5020       "Cannot retrieve locked instance %s" % self.op.instance_name
5021     _CheckNodeOnline(self, self.instance.primary_node)
5022
5023   def Exec(self, feedback_fn):
5024     """Connect to the console of an instance
5025
5026     """
5027     instance = self.instance
5028     node = instance.primary_node
5029
5030     node_insts = self.rpc.call_instance_list([node],
5031                                              [instance.hypervisor])[node]
5032     msg = node_insts.RemoteFailMsg()
5033     if msg:
5034       raise errors.OpExecError("Can't get node information from %s: %s" %
5035                                (node, msg))
5036
5037     if instance.name not in node_insts.payload:
5038       raise errors.OpExecError("Instance %s is not running." % instance.name)
5039
5040     logging.debug("Connecting to console of %s on %s", instance.name, node)
5041
5042     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5043     cluster = self.cfg.GetClusterInfo()
5044     # beparams and hvparams are passed separately, to avoid editing the
5045     # instance and then saving the defaults in the instance itself.
5046     hvparams = cluster.FillHV(instance)
5047     beparams = cluster.FillBE(instance)
5048     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5049
5050     # build ssh cmdline
5051     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5052
5053
5054 class LUReplaceDisks(LogicalUnit):
5055   """Replace the disks of an instance.
5056
5057   """
5058   HPATH = "mirrors-replace"
5059   HTYPE = constants.HTYPE_INSTANCE
5060   _OP_REQP = ["instance_name", "mode", "disks"]
5061   REQ_BGL = False
5062
5063   def CheckArguments(self):
5064     if not hasattr(self.op, "remote_node"):
5065       self.op.remote_node = None
5066     if not hasattr(self.op, "iallocator"):
5067       self.op.iallocator = None
5068
5069     # check for valid parameter combination
5070     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5071     if self.op.mode == constants.REPLACE_DISK_CHG:
5072       if cnt == 2:
5073         raise errors.OpPrereqError("When changing the secondary either an"
5074                                    " iallocator script must be used or the"
5075                                    " new node given")
5076       elif cnt == 0:
5077         raise errors.OpPrereqError("Give either the iallocator or the new"
5078                                    " secondary, not both")
5079     else: # not replacing the secondary
5080       if cnt != 2:
5081         raise errors.OpPrereqError("The iallocator and new node options can"
5082                                    " be used only when changing the"
5083                                    " secondary node")
5084
5085   def ExpandNames(self):
5086     self._ExpandAndLockInstance()
5087
5088     if self.op.iallocator is not None:
5089       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5090     elif self.op.remote_node is not None:
5091       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5092       if remote_node is None:
5093         raise errors.OpPrereqError("Node '%s' not known" %
5094                                    self.op.remote_node)
5095       self.op.remote_node = remote_node
5096       # Warning: do not remove the locking of the new secondary here
5097       # unless DRBD8.AddChildren is changed to work in parallel;
5098       # currently it doesn't since parallel invocations of
5099       # FindUnusedMinor will conflict
5100       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5101       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5102     else:
5103       self.needed_locks[locking.LEVEL_NODE] = []
5104       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5105
5106   def DeclareLocks(self, level):
5107     # If we're not already locking all nodes in the set we have to declare the
5108     # instance's primary/secondary nodes.
5109     if (level == locking.LEVEL_NODE and
5110         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5111       self._LockInstancesNodes()
5112
5113   def _RunAllocator(self):
5114     """Compute a new secondary node using an IAllocator.
5115
5116     """
5117     ial = IAllocator(self,
5118                      mode=constants.IALLOCATOR_MODE_RELOC,
5119                      name=self.op.instance_name,
5120                      relocate_from=[self.sec_node])
5121
5122     ial.Run(self.op.iallocator)
5123
5124     if not ial.success:
5125       raise errors.OpPrereqError("Can't compute nodes using"
5126                                  " iallocator '%s': %s" % (self.op.iallocator,
5127                                                            ial.info))
5128     if len(ial.nodes) != ial.required_nodes:
5129       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5130                                  " of nodes (%s), required %s" %
5131                                  (len(ial.nodes), ial.required_nodes))
5132     self.op.remote_node = ial.nodes[0]
5133     self.LogInfo("Selected new secondary for the instance: %s",
5134                  self.op.remote_node)
5135
5136   def BuildHooksEnv(self):
5137     """Build hooks env.
5138
5139     This runs on the master, the primary and all the secondaries.
5140
5141     """
5142     env = {
5143       "MODE": self.op.mode,
5144       "NEW_SECONDARY": self.op.remote_node,
5145       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5146       }
5147     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5148     nl = [
5149       self.cfg.GetMasterNode(),
5150       self.instance.primary_node,
5151       ]
5152     if self.op.remote_node is not None:
5153       nl.append(self.op.remote_node)
5154     return env, nl, nl
5155
5156   def CheckPrereq(self):
5157     """Check prerequisites.
5158
5159     This checks that the instance is in the cluster.
5160
5161     """
5162     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5163     assert instance is not None, \
5164       "Cannot retrieve locked instance %s" % self.op.instance_name
5165     self.instance = instance
5166
5167     if instance.disk_template != constants.DT_DRBD8:
5168       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5169                                  " instances")
5170
5171     if len(instance.secondary_nodes) != 1:
5172       raise errors.OpPrereqError("The instance has a strange layout,"
5173                                  " expected one secondary but found %d" %
5174                                  len(instance.secondary_nodes))
5175
5176     self.sec_node = instance.secondary_nodes[0]
5177
5178     if self.op.iallocator is not None:
5179       self._RunAllocator()
5180
5181     remote_node = self.op.remote_node
5182     if remote_node is not None:
5183       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5184       assert self.remote_node_info is not None, \
5185         "Cannot retrieve locked node %s" % remote_node
5186     else:
5187       self.remote_node_info = None
5188     if remote_node == instance.primary_node:
5189       raise errors.OpPrereqError("The specified node is the primary node of"
5190                                  " the instance.")
5191     elif remote_node == self.sec_node:
5192       raise errors.OpPrereqError("The specified node is already the"
5193                                  " secondary node of the instance.")
5194
5195     if self.op.mode == constants.REPLACE_DISK_PRI:
5196       n1 = self.tgt_node = instance.primary_node
5197       n2 = self.oth_node = self.sec_node
5198     elif self.op.mode == constants.REPLACE_DISK_SEC:
5199       n1 = self.tgt_node = self.sec_node
5200       n2 = self.oth_node = instance.primary_node
5201     elif self.op.mode == constants.REPLACE_DISK_CHG:
5202       n1 = self.new_node = remote_node
5203       n2 = self.oth_node = instance.primary_node
5204       self.tgt_node = self.sec_node
5205       _CheckNodeNotDrained(self, remote_node)
5206     else:
5207       raise errors.ProgrammerError("Unhandled disk replace mode")
5208
5209     _CheckNodeOnline(self, n1)
5210     _CheckNodeOnline(self, n2)
5211
5212     if not self.op.disks:
5213       self.op.disks = range(len(instance.disks))
5214
5215     for disk_idx in self.op.disks:
5216       instance.FindDisk(disk_idx)
5217
5218   def _ExecD8DiskOnly(self, feedback_fn):
5219     """Replace a disk on the primary or secondary for dbrd8.
5220
5221     The algorithm for replace is quite complicated:
5222
5223       1. for each disk to be replaced:
5224
5225         1. create new LVs on the target node with unique names
5226         1. detach old LVs from the drbd device
5227         1. rename old LVs to name_replaced.<time_t>
5228         1. rename new LVs to old LVs
5229         1. attach the new LVs (with the old names now) to the drbd device
5230
5231       1. wait for sync across all devices
5232
5233       1. for each modified disk:
5234
5235         1. remove old LVs (which have the name name_replaces.<time_t>)
5236
5237     Failures are not very well handled.
5238
5239     """
5240     steps_total = 6
5241     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5242     instance = self.instance
5243     iv_names = {}
5244     vgname = self.cfg.GetVGName()
5245     # start of work
5246     cfg = self.cfg
5247     tgt_node = self.tgt_node
5248     oth_node = self.oth_node
5249
5250     # Step: check device activation
5251     self.proc.LogStep(1, steps_total, "check device existence")
5252     info("checking volume groups")
5253     my_vg = cfg.GetVGName()
5254     results = self.rpc.call_vg_list([oth_node, tgt_node])
5255     if not results:
5256       raise errors.OpExecError("Can't list volume groups on the nodes")
5257     for node in oth_node, tgt_node:
5258       res = results[node]
5259       msg = res.RemoteFailMsg()
5260       if msg:
5261         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5262       if my_vg not in res.payload:
5263         raise errors.OpExecError("Volume group '%s' not found on %s" %
5264                                  (my_vg, node))
5265     for idx, dev in enumerate(instance.disks):
5266       if idx not in self.op.disks:
5267         continue
5268       for node in tgt_node, oth_node:
5269         info("checking disk/%d on %s" % (idx, node))
5270         cfg.SetDiskID(dev, node)
5271         result = self.rpc.call_blockdev_find(node, dev)
5272         msg = result.RemoteFailMsg()
5273         if not msg and not result.payload:
5274           msg = "disk not found"
5275         if msg:
5276           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5277                                    (idx, node, msg))
5278
5279     # Step: check other node consistency
5280     self.proc.LogStep(2, steps_total, "check peer consistency")
5281     for idx, dev in enumerate(instance.disks):
5282       if idx not in self.op.disks:
5283         continue
5284       info("checking disk/%d consistency on %s" % (idx, oth_node))
5285       if not _CheckDiskConsistency(self, dev, oth_node,
5286                                    oth_node==instance.primary_node):
5287         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5288                                  " to replace disks on this node (%s)" %
5289                                  (oth_node, tgt_node))
5290
5291     # Step: create new storage
5292     self.proc.LogStep(3, steps_total, "allocate new storage")
5293     for idx, dev in enumerate(instance.disks):
5294       if idx not in self.op.disks:
5295         continue
5296       size = dev.size
5297       cfg.SetDiskID(dev, tgt_node)
5298       lv_names = [".disk%d_%s" % (idx, suf)
5299                   for suf in ["data", "meta"]]
5300       names = _GenerateUniqueNames(self, lv_names)
5301       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5302                              logical_id=(vgname, names[0]))
5303       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5304                              logical_id=(vgname, names[1]))
5305       new_lvs = [lv_data, lv_meta]
5306       old_lvs = dev.children
5307       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5308       info("creating new local storage on %s for %s" %
5309            (tgt_node, dev.iv_name))
5310       # we pass force_create=True to force the LVM creation
5311       for new_lv in new_lvs:
5312         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5313                         _GetInstanceInfoText(instance), False)
5314
5315     # Step: for each lv, detach+rename*2+attach
5316     self.proc.LogStep(4, steps_total, "change drbd configuration")
5317     for dev, old_lvs, new_lvs in iv_names.itervalues():
5318       info("detaching %s drbd from local storage" % dev.iv_name)
5319       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5320       msg = result.RemoteFailMsg()
5321       if msg:
5322         raise errors.OpExecError("Can't detach drbd from local storage on node"
5323                                  " %s for device %s: %s" %
5324                                  (tgt_node, dev.iv_name, msg))
5325       #dev.children = []
5326       #cfg.Update(instance)
5327
5328       # ok, we created the new LVs, so now we know we have the needed
5329       # storage; as such, we proceed on the target node to rename
5330       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5331       # using the assumption that logical_id == physical_id (which in
5332       # turn is the unique_id on that node)
5333
5334       # FIXME(iustin): use a better name for the replaced LVs
5335       temp_suffix = int(time.time())
5336       ren_fn = lambda d, suff: (d.physical_id[0],
5337                                 d.physical_id[1] + "_replaced-%s" % suff)
5338       # build the rename list based on what LVs exist on the node
5339       rlist = []
5340       for to_ren in old_lvs:
5341         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5342         if not result.RemoteFailMsg() and result.payload:
5343           # device exists
5344           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5345
5346       info("renaming the old LVs on the target node")
5347       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5348       msg = result.RemoteFailMsg()
5349       if msg:
5350         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5351                                  (tgt_node, msg))
5352       # now we rename the new LVs to the old LVs
5353       info("renaming the new LVs on the target node")
5354       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5355       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5356       msg = result.RemoteFailMsg()
5357       if msg:
5358         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5359                                  (tgt_node, msg))
5360
5361       for old, new in zip(old_lvs, new_lvs):
5362         new.logical_id = old.logical_id
5363         cfg.SetDiskID(new, tgt_node)
5364
5365       for disk in old_lvs:
5366         disk.logical_id = ren_fn(disk, temp_suffix)
5367         cfg.SetDiskID(disk, tgt_node)
5368
5369       # now that the new lvs have the old name, we can add them to the device
5370       info("adding new mirror component on %s" % tgt_node)
5371       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5372       msg = result.RemoteFailMsg()
5373       if msg:
5374         for new_lv in new_lvs:
5375           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5376           if msg:
5377             warning("Can't rollback device %s: %s", dev, msg,
5378                     hint="cleanup manually the unused logical volumes")
5379         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5380
5381       dev.children = new_lvs
5382       cfg.Update(instance)
5383
5384     # Step: wait for sync
5385
5386     # this can fail as the old devices are degraded and _WaitForSync
5387     # does a combined result over all disks, so we don't check its
5388     # return value
5389     self.proc.LogStep(5, steps_total, "sync devices")
5390     _WaitForSync(self, instance, unlock=True)
5391
5392     # so check manually all the devices
5393     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5394       cfg.SetDiskID(dev, instance.primary_node)
5395       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5396       msg = result.RemoteFailMsg()
5397       if not msg and not result.payload:
5398         msg = "disk not found"
5399       if msg:
5400         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5401                                  (name, msg))
5402       if result.payload[5]:
5403         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5404
5405     # Step: remove old storage
5406     self.proc.LogStep(6, steps_total, "removing old storage")
5407     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5408       info("remove logical volumes for %s" % name)
5409       for lv in old_lvs:
5410         cfg.SetDiskID(lv, tgt_node)
5411         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5412         if msg:
5413           warning("Can't remove old LV: %s" % msg,
5414                   hint="manually remove unused LVs")
5415           continue
5416
5417   def _ExecD8Secondary(self, feedback_fn):
5418     """Replace the secondary node for drbd8.
5419
5420     The algorithm for replace is quite complicated:
5421       - for all disks of the instance:
5422         - create new LVs on the new node with same names
5423         - shutdown the drbd device on the old secondary
5424         - disconnect the drbd network on the primary
5425         - create the drbd device on the new secondary
5426         - network attach the drbd on the primary, using an artifice:
5427           the drbd code for Attach() will connect to the network if it
5428           finds a device which is connected to the good local disks but
5429           not network enabled
5430       - wait for sync across all devices
5431       - remove all disks from the old secondary
5432
5433     Failures are not very well handled.
5434
5435     """
5436     steps_total = 6
5437     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5438     instance = self.instance
5439     iv_names = {}
5440     # start of work
5441     cfg = self.cfg
5442     old_node = self.tgt_node
5443     new_node = self.new_node
5444     pri_node = instance.primary_node
5445     nodes_ip = {
5446       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5447       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5448       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5449       }
5450
5451     # Step: check device activation
5452     self.proc.LogStep(1, steps_total, "check device existence")
5453     info("checking volume groups")
5454     my_vg = cfg.GetVGName()
5455     results = self.rpc.call_vg_list([pri_node, new_node])
5456     for node in pri_node, new_node:
5457       res = results[node]
5458       msg = res.RemoteFailMsg()
5459       if msg:
5460         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5461       if my_vg not in res.payload:
5462         raise errors.OpExecError("Volume group '%s' not found on %s" %
5463                                  (my_vg, node))
5464     for idx, dev in enumerate(instance.disks):
5465       if idx not in self.op.disks:
5466         continue
5467       info("checking disk/%d on %s" % (idx, pri_node))
5468       cfg.SetDiskID(dev, pri_node)
5469       result = self.rpc.call_blockdev_find(pri_node, dev)
5470       msg = result.RemoteFailMsg()
5471       if not msg and not result.payload:
5472         msg = "disk not found"
5473       if msg:
5474         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5475                                  (idx, pri_node, msg))
5476
5477     # Step: check other node consistency
5478     self.proc.LogStep(2, steps_total, "check peer consistency")
5479     for idx, dev in enumerate(instance.disks):
5480       if idx not in self.op.disks:
5481         continue
5482       info("checking disk/%d consistency on %s" % (idx, pri_node))
5483       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5484         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5485                                  " unsafe to replace the secondary" %
5486                                  pri_node)
5487
5488     # Step: create new storage
5489     self.proc.LogStep(3, steps_total, "allocate new storage")
5490     for idx, dev in enumerate(instance.disks):
5491       info("adding new local storage on %s for disk/%d" %
5492            (new_node, idx))
5493       # we pass force_create=True to force LVM creation
5494       for new_lv in dev.children:
5495         _CreateBlockDev(self, new_node, instance, new_lv, True,
5496                         _GetInstanceInfoText(instance), False)
5497
5498     # Step 4: dbrd minors and drbd setups changes
5499     # after this, we must manually remove the drbd minors on both the
5500     # error and the success paths
5501     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5502                                    instance.name)
5503     logging.debug("Allocated minors %s" % (minors,))
5504     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5505     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5506       size = dev.size
5507       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5508       # create new devices on new_node; note that we create two IDs:
5509       # one without port, so the drbd will be activated without
5510       # networking information on the new node at this stage, and one
5511       # with network, for the latter activation in step 4
5512       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5513       if pri_node == o_node1:
5514         p_minor = o_minor1
5515       else:
5516         p_minor = o_minor2
5517
5518       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5519       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5520
5521       iv_names[idx] = (dev, dev.children, new_net_id)
5522       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5523                     new_net_id)
5524       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5525                               logical_id=new_alone_id,
5526                               children=dev.children)
5527       try:
5528         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5529                               _GetInstanceInfoText(instance), False)
5530       except errors.GenericError:
5531         self.cfg.ReleaseDRBDMinors(instance.name)
5532         raise
5533
5534     for idx, dev in enumerate(instance.disks):
5535       # we have new devices, shutdown the drbd on the old secondary
5536       info("shutting down drbd for disk/%d on old node" % idx)
5537       cfg.SetDiskID(dev, old_node)
5538       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5539       if msg:
5540         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5541                 (idx, msg),
5542                 hint="Please cleanup this device manually as soon as possible")
5543
5544     info("detaching primary drbds from the network (=> standalone)")
5545     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5546                                                instance.disks)[pri_node]
5547
5548     msg = result.RemoteFailMsg()
5549     if msg:
5550       # detaches didn't succeed (unlikely)
5551       self.cfg.ReleaseDRBDMinors(instance.name)
5552       raise errors.OpExecError("Can't detach the disks from the network on"
5553                                " old node: %s" % (msg,))
5554
5555     # if we managed to detach at least one, we update all the disks of
5556     # the instance to point to the new secondary
5557     info("updating instance configuration")
5558     for dev, _, new_logical_id in iv_names.itervalues():
5559       dev.logical_id = new_logical_id
5560       cfg.SetDiskID(dev, pri_node)
5561     cfg.Update(instance)
5562
5563     # and now perform the drbd attach
5564     info("attaching primary drbds to new secondary (standalone => connected)")
5565     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5566                                            instance.disks, instance.name,
5567                                            False)
5568     for to_node, to_result in result.items():
5569       msg = to_result.RemoteFailMsg()
5570       if msg:
5571         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5572                 hint="please do a gnt-instance info to see the"
5573                 " status of disks")
5574
5575     # this can fail as the old devices are degraded and _WaitForSync
5576     # does a combined result over all disks, so we don't check its
5577     # return value
5578     self.proc.LogStep(5, steps_total, "sync devices")
5579     _WaitForSync(self, instance, unlock=True)
5580
5581     # so check manually all the devices
5582     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5583       cfg.SetDiskID(dev, pri_node)
5584       result = self.rpc.call_blockdev_find(pri_node, dev)
5585       msg = result.RemoteFailMsg()
5586       if not msg and not result.payload:
5587         msg = "disk not found"
5588       if msg:
5589         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5590                                  (idx, msg))
5591       if result.payload[5]:
5592         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5593
5594     self.proc.LogStep(6, steps_total, "removing old storage")
5595     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5596       info("remove logical volumes for disk/%d" % idx)
5597       for lv in old_lvs:
5598         cfg.SetDiskID(lv, old_node)
5599         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5600         if msg:
5601           warning("Can't remove LV on old secondary: %s", msg,
5602                   hint="Cleanup stale volumes by hand")
5603
5604   def Exec(self, feedback_fn):
5605     """Execute disk replacement.
5606
5607     This dispatches the disk replacement to the appropriate handler.
5608
5609     """
5610     instance = self.instance
5611
5612     # Activate the instance disks if we're replacing them on a down instance
5613     if not instance.admin_up:
5614       _StartInstanceDisks(self, instance, True)
5615
5616     if self.op.mode == constants.REPLACE_DISK_CHG:
5617       fn = self._ExecD8Secondary
5618     else:
5619       fn = self._ExecD8DiskOnly
5620
5621     ret = fn(feedback_fn)
5622
5623     # Deactivate the instance disks if we're replacing them on a down instance
5624     if not instance.admin_up:
5625       _SafeShutdownInstanceDisks(self, instance)
5626
5627     return ret
5628
5629
5630 class LUGrowDisk(LogicalUnit):
5631   """Grow a disk of an instance.
5632
5633   """
5634   HPATH = "disk-grow"
5635   HTYPE = constants.HTYPE_INSTANCE
5636   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5637   REQ_BGL = False
5638
5639   def ExpandNames(self):
5640     self._ExpandAndLockInstance()
5641     self.needed_locks[locking.LEVEL_NODE] = []
5642     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5643
5644   def DeclareLocks(self, level):
5645     if level == locking.LEVEL_NODE:
5646       self._LockInstancesNodes()
5647
5648   def BuildHooksEnv(self):
5649     """Build hooks env.
5650
5651     This runs on the master, the primary and all the secondaries.
5652
5653     """
5654     env = {
5655       "DISK": self.op.disk,
5656       "AMOUNT": self.op.amount,
5657       }
5658     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5659     nl = [
5660       self.cfg.GetMasterNode(),
5661       self.instance.primary_node,
5662       ]
5663     return env, nl, nl
5664
5665   def CheckPrereq(self):
5666     """Check prerequisites.
5667
5668     This checks that the instance is in the cluster.
5669
5670     """
5671     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5672     assert instance is not None, \
5673       "Cannot retrieve locked instance %s" % self.op.instance_name
5674     nodenames = list(instance.all_nodes)
5675     for node in nodenames:
5676       _CheckNodeOnline(self, node)
5677
5678
5679     self.instance = instance
5680
5681     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5682       raise errors.OpPrereqError("Instance's disk layout does not support"
5683                                  " growing.")
5684
5685     self.disk = instance.FindDisk(self.op.disk)
5686
5687     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5688                                        instance.hypervisor)
5689     for node in nodenames:
5690       info = nodeinfo[node]
5691       msg = info.RemoteFailMsg()
5692       if msg:
5693         raise errors.OpPrereqError("Cannot get current information"
5694                                    " from node %s:" % (node, msg))
5695       vg_free = info.payload.get('vg_free', None)
5696       if not isinstance(vg_free, int):
5697         raise errors.OpPrereqError("Can't compute free disk space on"
5698                                    " node %s" % node)
5699       if self.op.amount > vg_free:
5700         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5701                                    " %d MiB available, %d MiB required" %
5702                                    (node, vg_free, self.op.amount))
5703
5704   def Exec(self, feedback_fn):
5705     """Execute disk grow.
5706
5707     """
5708     instance = self.instance
5709     disk = self.disk
5710     for node in instance.all_nodes:
5711       self.cfg.SetDiskID(disk, node)
5712       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5713       msg = result.RemoteFailMsg()
5714       if msg:
5715         raise errors.OpExecError("Grow request failed to node %s: %s" %
5716                                  (node, msg))
5717     disk.RecordGrow(self.op.amount)
5718     self.cfg.Update(instance)
5719     if self.op.wait_for_sync:
5720       disk_abort = not _WaitForSync(self, instance)
5721       if disk_abort:
5722         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5723                              " status.\nPlease check the instance.")
5724
5725
5726 class LUQueryInstanceData(NoHooksLU):
5727   """Query runtime instance data.
5728
5729   """
5730   _OP_REQP = ["instances", "static"]
5731   REQ_BGL = False
5732
5733   def ExpandNames(self):
5734     self.needed_locks = {}
5735     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5736
5737     if not isinstance(self.op.instances, list):
5738       raise errors.OpPrereqError("Invalid argument type 'instances'")
5739
5740     if self.op.instances:
5741       self.wanted_names = []
5742       for name in self.op.instances:
5743         full_name = self.cfg.ExpandInstanceName(name)
5744         if full_name is None:
5745           raise errors.OpPrereqError("Instance '%s' not known" % name)
5746         self.wanted_names.append(full_name)
5747       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5748     else:
5749       self.wanted_names = None
5750       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5751
5752     self.needed_locks[locking.LEVEL_NODE] = []
5753     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5754
5755   def DeclareLocks(self, level):
5756     if level == locking.LEVEL_NODE:
5757       self._LockInstancesNodes()
5758
5759   def CheckPrereq(self):
5760     """Check prerequisites.
5761
5762     This only checks the optional instance list against the existing names.
5763
5764     """
5765     if self.wanted_names is None:
5766       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5767
5768     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5769                              in self.wanted_names]
5770     return
5771
5772   def _ComputeDiskStatus(self, instance, snode, dev):
5773     """Compute block device status.
5774
5775     """
5776     static = self.op.static
5777     if not static:
5778       self.cfg.SetDiskID(dev, instance.primary_node)
5779       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5780       if dev_pstatus.offline:
5781         dev_pstatus = None
5782       else:
5783         msg = dev_pstatus.RemoteFailMsg()
5784         if msg:
5785           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5786                                    (instance.name, msg))
5787         dev_pstatus = dev_pstatus.payload
5788     else:
5789       dev_pstatus = None
5790
5791     if dev.dev_type in constants.LDS_DRBD:
5792       # we change the snode then (otherwise we use the one passed in)
5793       if dev.logical_id[0] == instance.primary_node:
5794         snode = dev.logical_id[1]
5795       else:
5796         snode = dev.logical_id[0]
5797
5798     if snode and not static:
5799       self.cfg.SetDiskID(dev, snode)
5800       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5801       if dev_sstatus.offline:
5802         dev_sstatus = None
5803       else:
5804         msg = dev_sstatus.RemoteFailMsg()
5805         if msg:
5806           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5807                                    (instance.name, msg))
5808         dev_sstatus = dev_sstatus.payload
5809     else:
5810       dev_sstatus = None
5811
5812     if dev.children:
5813       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5814                       for child in dev.children]
5815     else:
5816       dev_children = []
5817
5818     data = {
5819       "iv_name": dev.iv_name,
5820       "dev_type": dev.dev_type,
5821       "logical_id": dev.logical_id,
5822       "physical_id": dev.physical_id,
5823       "pstatus": dev_pstatus,
5824       "sstatus": dev_sstatus,
5825       "children": dev_children,
5826       "mode": dev.mode,
5827       }
5828
5829     return data
5830
5831   def Exec(self, feedback_fn):
5832     """Gather and return data"""
5833     result = {}
5834
5835     cluster = self.cfg.GetClusterInfo()
5836
5837     for instance in self.wanted_instances:
5838       if not self.op.static:
5839         remote_info = self.rpc.call_instance_info(instance.primary_node,
5840                                                   instance.name,
5841                                                   instance.hypervisor)
5842         msg = remote_info.RemoteFailMsg()
5843         if msg:
5844           raise errors.OpExecError("Error checking node %s: %s" %
5845                                    (instance.primary_node, msg))
5846         remote_info = remote_info.payload
5847         if remote_info and "state" in remote_info:
5848           remote_state = "up"
5849         else:
5850           remote_state = "down"
5851       else:
5852         remote_state = None
5853       if instance.admin_up:
5854         config_state = "up"
5855       else:
5856         config_state = "down"
5857
5858       disks = [self._ComputeDiskStatus(instance, None, device)
5859                for device in instance.disks]
5860
5861       idict = {
5862         "name": instance.name,
5863         "config_state": config_state,
5864         "run_state": remote_state,
5865         "pnode": instance.primary_node,
5866         "snodes": instance.secondary_nodes,
5867         "os": instance.os,
5868         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5869         "disks": disks,
5870         "hypervisor": instance.hypervisor,
5871         "network_port": instance.network_port,
5872         "hv_instance": instance.hvparams,
5873         "hv_actual": cluster.FillHV(instance),
5874         "be_instance": instance.beparams,
5875         "be_actual": cluster.FillBE(instance),
5876         }
5877
5878       result[instance.name] = idict
5879
5880     return result
5881
5882
5883 class LUSetInstanceParams(LogicalUnit):
5884   """Modifies an instances's parameters.
5885
5886   """
5887   HPATH = "instance-modify"
5888   HTYPE = constants.HTYPE_INSTANCE
5889   _OP_REQP = ["instance_name"]
5890   REQ_BGL = False
5891
5892   def CheckArguments(self):
5893     if not hasattr(self.op, 'nics'):
5894       self.op.nics = []
5895     if not hasattr(self.op, 'disks'):
5896       self.op.disks = []
5897     if not hasattr(self.op, 'beparams'):
5898       self.op.beparams = {}
5899     if not hasattr(self.op, 'hvparams'):
5900       self.op.hvparams = {}
5901     self.op.force = getattr(self.op, "force", False)
5902     if not (self.op.nics or self.op.disks or
5903             self.op.hvparams or self.op.beparams):
5904       raise errors.OpPrereqError("No changes submitted")
5905
5906     # Disk validation
5907     disk_addremove = 0
5908     for disk_op, disk_dict in self.op.disks:
5909       if disk_op == constants.DDM_REMOVE:
5910         disk_addremove += 1
5911         continue
5912       elif disk_op == constants.DDM_ADD:
5913         disk_addremove += 1
5914       else:
5915         if not isinstance(disk_op, int):
5916           raise errors.OpPrereqError("Invalid disk index")
5917       if disk_op == constants.DDM_ADD:
5918         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5919         if mode not in constants.DISK_ACCESS_SET:
5920           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5921         size = disk_dict.get('size', None)
5922         if size is None:
5923           raise errors.OpPrereqError("Required disk parameter size missing")
5924         try:
5925           size = int(size)
5926         except ValueError, err:
5927           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5928                                      str(err))
5929         disk_dict['size'] = size
5930       else:
5931         # modification of disk
5932         if 'size' in disk_dict:
5933           raise errors.OpPrereqError("Disk size change not possible, use"
5934                                      " grow-disk")
5935
5936     if disk_addremove > 1:
5937       raise errors.OpPrereqError("Only one disk add or remove operation"
5938                                  " supported at a time")
5939
5940     # NIC validation
5941     nic_addremove = 0
5942     for nic_op, nic_dict in self.op.nics:
5943       if nic_op == constants.DDM_REMOVE:
5944         nic_addremove += 1
5945         continue
5946       elif nic_op == constants.DDM_ADD:
5947         nic_addremove += 1
5948       else:
5949         if not isinstance(nic_op, int):
5950           raise errors.OpPrereqError("Invalid nic index")
5951
5952       # nic_dict should be a dict
5953       nic_ip = nic_dict.get('ip', None)
5954       if nic_ip is not None:
5955         if nic_ip.lower() == constants.VALUE_NONE:
5956           nic_dict['ip'] = None
5957         else:
5958           if not utils.IsValidIP(nic_ip):
5959             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5960
5961       nic_bridge = nic_dict.get('bridge', None)
5962       nic_link = nic_dict.get('link', None)
5963       if nic_bridge and nic_link:
5964         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5965       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5966         nic_dict['bridge'] = None
5967       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5968         nic_dict['link'] = None
5969
5970       if nic_op == constants.DDM_ADD:
5971         nic_mac = nic_dict.get('mac', None)
5972         if nic_mac is None:
5973           nic_dict['mac'] = constants.VALUE_AUTO
5974
5975       if 'mac' in nic_dict:
5976         nic_mac = nic_dict['mac']
5977         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5978           if not utils.IsValidMac(nic_mac):
5979             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5980         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5981           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5982                                      " modifying an existing nic")
5983
5984     if nic_addremove > 1:
5985       raise errors.OpPrereqError("Only one NIC add or remove operation"
5986                                  " supported at a time")
5987
5988   def ExpandNames(self):
5989     self._ExpandAndLockInstance()
5990     self.needed_locks[locking.LEVEL_NODE] = []
5991     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5992
5993   def DeclareLocks(self, level):
5994     if level == locking.LEVEL_NODE:
5995       self._LockInstancesNodes()
5996
5997   def BuildHooksEnv(self):
5998     """Build hooks env.
5999
6000     This runs on the master, primary and secondaries.
6001
6002     """
6003     args = dict()
6004     if constants.BE_MEMORY in self.be_new:
6005       args['memory'] = self.be_new[constants.BE_MEMORY]
6006     if constants.BE_VCPUS in self.be_new:
6007       args['vcpus'] = self.be_new[constants.BE_VCPUS]
6008     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6009     # information at all.
6010     if self.op.nics:
6011       args['nics'] = []
6012       nic_override = dict(self.op.nics)
6013       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6014       for idx, nic in enumerate(self.instance.nics):
6015         if idx in nic_override:
6016           this_nic_override = nic_override[idx]
6017         else:
6018           this_nic_override = {}
6019         if 'ip' in this_nic_override:
6020           ip = this_nic_override['ip']
6021         else:
6022           ip = nic.ip
6023         if 'mac' in this_nic_override:
6024           mac = this_nic_override['mac']
6025         else:
6026           mac = nic.mac
6027         if idx in self.nic_pnew:
6028           nicparams = self.nic_pnew[idx]
6029         else:
6030           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6031         mode = nicparams[constants.NIC_MODE]
6032         link = nicparams[constants.NIC_LINK]
6033         args['nics'].append((ip, mac, mode, link))
6034       if constants.DDM_ADD in nic_override:
6035         ip = nic_override[constants.DDM_ADD].get('ip', None)
6036         mac = nic_override[constants.DDM_ADD]['mac']
6037         nicparams = self.nic_pnew[constants.DDM_ADD]
6038         mode = nicparams[constants.NIC_MODE]
6039         link = nicparams[constants.NIC_LINK]
6040         args['nics'].append((ip, mac, mode, link))
6041       elif constants.DDM_REMOVE in nic_override:
6042         del args['nics'][-1]
6043
6044     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6045     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6046     return env, nl, nl
6047
6048   def _GetUpdatedParams(self, old_params, update_dict,
6049                         default_values, parameter_types):
6050     """Return the new params dict for the given params.
6051
6052     @type old_params: dict
6053     @type old_params: old parameters
6054     @type update_dict: dict
6055     @type update_dict: dict containing new parameter values,
6056                        or constants.VALUE_DEFAULT to reset the
6057                        parameter to its default value
6058     @type default_values: dict
6059     @param default_values: default values for the filled parameters
6060     @type parameter_types: dict
6061     @param parameter_types: dict mapping target dict keys to types
6062                             in constants.ENFORCEABLE_TYPES
6063     @rtype: (dict, dict)
6064     @return: (new_parameters, filled_parameters)
6065
6066     """
6067     params_copy = copy.deepcopy(old_params)
6068     for key, val in update_dict.iteritems():
6069       if val == constants.VALUE_DEFAULT:
6070         try:
6071           del params_copy[key]
6072         except KeyError:
6073           pass
6074       else:
6075         params_copy[key] = val
6076     utils.ForceDictType(params_copy, parameter_types)
6077     params_filled = objects.FillDict(default_values, params_copy)
6078     return (params_copy, params_filled)
6079
6080   def CheckPrereq(self):
6081     """Check prerequisites.
6082
6083     This only checks the instance list against the existing names.
6084
6085     """
6086     force = self.force = self.op.force
6087
6088     # checking the new params on the primary/secondary nodes
6089
6090     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6091     cluster = self.cluster = self.cfg.GetClusterInfo()
6092     assert self.instance is not None, \
6093       "Cannot retrieve locked instance %s" % self.op.instance_name
6094     pnode = instance.primary_node
6095     nodelist = list(instance.all_nodes)
6096
6097     # hvparams processing
6098     if self.op.hvparams:
6099       i_hvdict, hv_new = self._GetUpdatedParams(
6100                              instance.hvparams, self.op.hvparams,
6101                              cluster.hvparams[instance.hypervisor],
6102                              constants.HVS_PARAMETER_TYPES)
6103       # local check
6104       hypervisor.GetHypervisor(
6105         instance.hypervisor).CheckParameterSyntax(hv_new)
6106       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6107       self.hv_new = hv_new # the new actual values
6108       self.hv_inst = i_hvdict # the new dict (without defaults)
6109     else:
6110       self.hv_new = self.hv_inst = {}
6111
6112     # beparams processing
6113     if self.op.beparams:
6114       i_bedict, be_new = self._GetUpdatedParams(
6115                              instance.beparams, self.op.beparams,
6116                              cluster.beparams[constants.PP_DEFAULT],
6117                              constants.BES_PARAMETER_TYPES)
6118       self.be_new = be_new # the new actual values
6119       self.be_inst = i_bedict # the new dict (without defaults)
6120     else:
6121       self.be_new = self.be_inst = {}
6122
6123     self.warn = []
6124
6125     if constants.BE_MEMORY in self.op.beparams and not self.force:
6126       mem_check_list = [pnode]
6127       if be_new[constants.BE_AUTO_BALANCE]:
6128         # either we changed auto_balance to yes or it was from before
6129         mem_check_list.extend(instance.secondary_nodes)
6130       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6131                                                   instance.hypervisor)
6132       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6133                                          instance.hypervisor)
6134       pninfo = nodeinfo[pnode]
6135       msg = pninfo.RemoteFailMsg()
6136       if msg:
6137         # Assume the primary node is unreachable and go ahead
6138         self.warn.append("Can't get info from primary node %s: %s" %
6139                          (pnode,  msg))
6140       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6141         self.warn.append("Node data from primary node %s doesn't contain"
6142                          " free memory information" % pnode)
6143       elif instance_info.RemoteFailMsg():
6144         self.warn.append("Can't get instance runtime information: %s" %
6145                         instance_info.RemoteFailMsg())
6146       else:
6147         if instance_info.payload:
6148           current_mem = int(instance_info.payload['memory'])
6149         else:
6150           # Assume instance not running
6151           # (there is a slight race condition here, but it's not very probable,
6152           # and we have no other way to check)
6153           current_mem = 0
6154         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6155                     pninfo.payload['memory_free'])
6156         if miss_mem > 0:
6157           raise errors.OpPrereqError("This change will prevent the instance"
6158                                      " from starting, due to %d MB of memory"
6159                                      " missing on its primary node" % miss_mem)
6160
6161       if be_new[constants.BE_AUTO_BALANCE]:
6162         for node, nres in nodeinfo.items():
6163           if node not in instance.secondary_nodes:
6164             continue
6165           msg = nres.RemoteFailMsg()
6166           if msg:
6167             self.warn.append("Can't get info from secondary node %s: %s" %
6168                              (node, msg))
6169           elif not isinstance(nres.payload.get('memory_free', None), int):
6170             self.warn.append("Secondary node %s didn't return free"
6171                              " memory information" % node)
6172           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6173             self.warn.append("Not enough memory to failover instance to"
6174                              " secondary node %s" % node)
6175
6176     # NIC processing
6177     self.nic_pnew = {}
6178     self.nic_pinst = {}
6179     for nic_op, nic_dict in self.op.nics:
6180       if nic_op == constants.DDM_REMOVE:
6181         if not instance.nics:
6182           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6183         continue
6184       if nic_op != constants.DDM_ADD:
6185         # an existing nic
6186         if nic_op < 0 or nic_op >= len(instance.nics):
6187           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6188                                      " are 0 to %d" %
6189                                      (nic_op, len(instance.nics)))
6190         old_nic_params = instance.nics[nic_op].nicparams
6191         old_nic_ip = instance.nics[nic_op].ip
6192       else:
6193         old_nic_params = {}
6194         old_nic_ip = None
6195
6196       update_params_dict = dict([(key, nic_dict[key])
6197                                  for key in constants.NICS_PARAMETERS
6198                                  if key in nic_dict])
6199
6200       if 'bridge' in nic_dict:
6201         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6202
6203       new_nic_params, new_filled_nic_params = \
6204           self._GetUpdatedParams(old_nic_params, update_params_dict,
6205                                  cluster.nicparams[constants.PP_DEFAULT],
6206                                  constants.NICS_PARAMETER_TYPES)
6207       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6208       self.nic_pinst[nic_op] = new_nic_params
6209       self.nic_pnew[nic_op] = new_filled_nic_params
6210       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6211
6212       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6213         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6214         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6215         msg = result.RemoteFailMsg()
6216         if msg:
6217           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6218           if self.force:
6219             self.warn.append(msg)
6220           else:
6221             raise errors.OpPrereqError(msg)
6222       if new_nic_mode == constants.NIC_MODE_ROUTED:
6223         if 'ip' in nic_dict:
6224           nic_ip = nic_dict['ip']
6225         else:
6226           nic_ip = old_nic_ip
6227         if nic_ip is None:
6228           raise errors.OpPrereqError('Cannot set the nic ip to None'
6229                                      ' on a routed nic')
6230       if 'mac' in nic_dict:
6231         nic_mac = nic_dict['mac']
6232         if nic_mac is None:
6233           raise errors.OpPrereqError('Cannot set the nic mac to None')
6234         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6235           # otherwise generate the mac
6236           nic_dict['mac'] = self.cfg.GenerateMAC()
6237         else:
6238           # or validate/reserve the current one
6239           if self.cfg.IsMacInUse(nic_mac):
6240             raise errors.OpPrereqError("MAC address %s already in use"
6241                                        " in cluster" % nic_mac)
6242
6243     # DISK processing
6244     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6245       raise errors.OpPrereqError("Disk operations not supported for"
6246                                  " diskless instances")
6247     for disk_op, disk_dict in self.op.disks:
6248       if disk_op == constants.DDM_REMOVE:
6249         if len(instance.disks) == 1:
6250           raise errors.OpPrereqError("Cannot remove the last disk of"
6251                                      " an instance")
6252         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6253         ins_l = ins_l[pnode]
6254         msg = ins_l.RemoteFailMsg()
6255         if msg:
6256           raise errors.OpPrereqError("Can't contact node %s: %s" %
6257                                      (pnode, msg))
6258         if instance.name in ins_l.payload:
6259           raise errors.OpPrereqError("Instance is running, can't remove"
6260                                      " disks.")
6261
6262       if (disk_op == constants.DDM_ADD and
6263           len(instance.nics) >= constants.MAX_DISKS):
6264         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6265                                    " add more" % constants.MAX_DISKS)
6266       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6267         # an existing disk
6268         if disk_op < 0 or disk_op >= len(instance.disks):
6269           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6270                                      " are 0 to %d" %
6271                                      (disk_op, len(instance.disks)))
6272
6273     return
6274
6275   def Exec(self, feedback_fn):
6276     """Modifies an instance.
6277
6278     All parameters take effect only at the next restart of the instance.
6279
6280     """
6281     # Process here the warnings from CheckPrereq, as we don't have a
6282     # feedback_fn there.
6283     for warn in self.warn:
6284       feedback_fn("WARNING: %s" % warn)
6285
6286     result = []
6287     instance = self.instance
6288     cluster = self.cluster
6289     # disk changes
6290     for disk_op, disk_dict in self.op.disks:
6291       if disk_op == constants.DDM_REMOVE:
6292         # remove the last disk
6293         device = instance.disks.pop()
6294         device_idx = len(instance.disks)
6295         for node, disk in device.ComputeNodeTree(instance.primary_node):
6296           self.cfg.SetDiskID(disk, node)
6297           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6298           if msg:
6299             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6300                             " continuing anyway", device_idx, node, msg)
6301         result.append(("disk/%d" % device_idx, "remove"))
6302       elif disk_op == constants.DDM_ADD:
6303         # add a new disk
6304         if instance.disk_template == constants.DT_FILE:
6305           file_driver, file_path = instance.disks[0].logical_id
6306           file_path = os.path.dirname(file_path)
6307         else:
6308           file_driver = file_path = None
6309         disk_idx_base = len(instance.disks)
6310         new_disk = _GenerateDiskTemplate(self,
6311                                          instance.disk_template,
6312                                          instance.name, instance.primary_node,
6313                                          instance.secondary_nodes,
6314                                          [disk_dict],
6315                                          file_path,
6316                                          file_driver,
6317                                          disk_idx_base)[0]
6318         instance.disks.append(new_disk)
6319         info = _GetInstanceInfoText(instance)
6320
6321         logging.info("Creating volume %s for instance %s",
6322                      new_disk.iv_name, instance.name)
6323         # Note: this needs to be kept in sync with _CreateDisks
6324         #HARDCODE
6325         for node in instance.all_nodes:
6326           f_create = node == instance.primary_node
6327           try:
6328             _CreateBlockDev(self, node, instance, new_disk,
6329                             f_create, info, f_create)
6330           except errors.OpExecError, err:
6331             self.LogWarning("Failed to create volume %s (%s) on"
6332                             " node %s: %s",
6333                             new_disk.iv_name, new_disk, node, err)
6334         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6335                        (new_disk.size, new_disk.mode)))
6336       else:
6337         # change a given disk
6338         instance.disks[disk_op].mode = disk_dict['mode']
6339         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6340     # NIC changes
6341     for nic_op, nic_dict in self.op.nics:
6342       if nic_op == constants.DDM_REMOVE:
6343         # remove the last nic
6344         del instance.nics[-1]
6345         result.append(("nic.%d" % len(instance.nics), "remove"))
6346       elif nic_op == constants.DDM_ADD:
6347         # mac and bridge should be set, by now
6348         mac = nic_dict['mac']
6349         ip = nic_dict.get('ip', None)
6350         nicparams = self.nic_pinst[constants.DDM_ADD]
6351         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6352         instance.nics.append(new_nic)
6353         result.append(("nic.%d" % (len(instance.nics) - 1),
6354                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6355                        (new_nic.mac, new_nic.ip,
6356                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6357                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6358                        )))
6359       else:
6360         for key in 'mac', 'ip':
6361           if key in nic_dict:
6362             setattr(instance.nics[nic_op], key, nic_dict[key])
6363         if nic_op in self.nic_pnew:
6364           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6365         for key, val in nic_dict.iteritems():
6366           result.append(("nic.%s/%d" % (key, nic_op), val))
6367
6368     # hvparams changes
6369     if self.op.hvparams:
6370       instance.hvparams = self.hv_inst
6371       for key, val in self.op.hvparams.iteritems():
6372         result.append(("hv/%s" % key, val))
6373
6374     # beparams changes
6375     if self.op.beparams:
6376       instance.beparams = self.be_inst
6377       for key, val in self.op.beparams.iteritems():
6378         result.append(("be/%s" % key, val))
6379
6380     self.cfg.Update(instance)
6381
6382     return result
6383
6384
6385 class LUQueryExports(NoHooksLU):
6386   """Query the exports list
6387
6388   """
6389   _OP_REQP = ['nodes']
6390   REQ_BGL = False
6391
6392   def ExpandNames(self):
6393     self.needed_locks = {}
6394     self.share_locks[locking.LEVEL_NODE] = 1
6395     if not self.op.nodes:
6396       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6397     else:
6398       self.needed_locks[locking.LEVEL_NODE] = \
6399         _GetWantedNodes(self, self.op.nodes)
6400
6401   def CheckPrereq(self):
6402     """Check prerequisites.
6403
6404     """
6405     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6406
6407   def Exec(self, feedback_fn):
6408     """Compute the list of all the exported system images.
6409
6410     @rtype: dict
6411     @return: a dictionary with the structure node->(export-list)
6412         where export-list is a list of the instances exported on
6413         that node.
6414
6415     """
6416     rpcresult = self.rpc.call_export_list(self.nodes)
6417     result = {}
6418     for node in rpcresult:
6419       if rpcresult[node].RemoteFailMsg():
6420         result[node] = False
6421       else:
6422         result[node] = rpcresult[node].payload
6423
6424     return result
6425
6426
6427 class LUExportInstance(LogicalUnit):
6428   """Export an instance to an image in the cluster.
6429
6430   """
6431   HPATH = "instance-export"
6432   HTYPE = constants.HTYPE_INSTANCE
6433   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6434   REQ_BGL = False
6435
6436   def ExpandNames(self):
6437     self._ExpandAndLockInstance()
6438     # FIXME: lock only instance primary and destination node
6439     #
6440     # Sad but true, for now we have do lock all nodes, as we don't know where
6441     # the previous export might be, and and in this LU we search for it and
6442     # remove it from its current node. In the future we could fix this by:
6443     #  - making a tasklet to search (share-lock all), then create the new one,
6444     #    then one to remove, after
6445     #  - removing the removal operation altoghether
6446     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6447
6448   def DeclareLocks(self, level):
6449     """Last minute lock declaration."""
6450     # All nodes are locked anyway, so nothing to do here.
6451
6452   def BuildHooksEnv(self):
6453     """Build hooks env.
6454
6455     This will run on the master, primary node and target node.
6456
6457     """
6458     env = {
6459       "EXPORT_NODE": self.op.target_node,
6460       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6461       }
6462     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6463     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6464           self.op.target_node]
6465     return env, nl, nl
6466
6467   def CheckPrereq(self):
6468     """Check prerequisites.
6469
6470     This checks that the instance and node names are valid.
6471
6472     """
6473     instance_name = self.op.instance_name
6474     self.instance = self.cfg.GetInstanceInfo(instance_name)
6475     assert self.instance is not None, \
6476           "Cannot retrieve locked instance %s" % self.op.instance_name
6477     _CheckNodeOnline(self, self.instance.primary_node)
6478
6479     self.dst_node = self.cfg.GetNodeInfo(
6480       self.cfg.ExpandNodeName(self.op.target_node))
6481
6482     if self.dst_node is None:
6483       # This is wrong node name, not a non-locked node
6484       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6485     _CheckNodeOnline(self, self.dst_node.name)
6486     _CheckNodeNotDrained(self, self.dst_node.name)
6487
6488     # instance disk type verification
6489     for disk in self.instance.disks:
6490       if disk.dev_type == constants.LD_FILE:
6491         raise errors.OpPrereqError("Export not supported for instances with"
6492                                    " file-based disks")
6493
6494   def Exec(self, feedback_fn):
6495     """Export an instance to an image in the cluster.
6496
6497     """
6498     instance = self.instance
6499     dst_node = self.dst_node
6500     src_node = instance.primary_node
6501     if self.op.shutdown:
6502       # shutdown the instance, but not the disks
6503       result = self.rpc.call_instance_shutdown(src_node, instance)
6504       msg = result.RemoteFailMsg()
6505       if msg:
6506         raise errors.OpExecError("Could not shutdown instance %s on"
6507                                  " node %s: %s" %
6508                                  (instance.name, src_node, msg))
6509
6510     vgname = self.cfg.GetVGName()
6511
6512     snap_disks = []
6513
6514     # set the disks ID correctly since call_instance_start needs the
6515     # correct drbd minor to create the symlinks
6516     for disk in instance.disks:
6517       self.cfg.SetDiskID(disk, src_node)
6518
6519     try:
6520       for disk in instance.disks:
6521         # result.payload will be a snapshot of an lvm leaf of the one we passed
6522         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6523         msg = result.RemoteFailMsg()
6524         if msg:
6525           self.LogWarning("Could not snapshot block device %s on node %s: %s",
6526                           disk.logical_id[1], src_node, msg)
6527           snap_disks.append(False)
6528         else:
6529           disk_id = (vgname, result.payload)
6530           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6531                                  logical_id=disk_id, physical_id=disk_id,
6532                                  iv_name=disk.iv_name)
6533           snap_disks.append(new_dev)
6534
6535     finally:
6536       if self.op.shutdown and instance.admin_up:
6537         result = self.rpc.call_instance_start(src_node, instance, None, None)
6538         msg = result.RemoteFailMsg()
6539         if msg:
6540           _ShutdownInstanceDisks(self, instance)
6541           raise errors.OpExecError("Could not start instance: %s" % msg)
6542
6543     # TODO: check for size
6544
6545     cluster_name = self.cfg.GetClusterName()
6546     for idx, dev in enumerate(snap_disks):
6547       if dev:
6548         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6549                                                instance, cluster_name, idx)
6550         msg = result.RemoteFailMsg()
6551         if msg:
6552           self.LogWarning("Could not export block device %s from node %s to"
6553                           " node %s: %s", dev.logical_id[1], src_node,
6554                           dst_node.name, msg)
6555         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6556         if msg:
6557           self.LogWarning("Could not remove snapshot block device %s from node"
6558                           " %s: %s", dev.logical_id[1], src_node, msg)
6559
6560     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6561     msg = result.RemoteFailMsg()
6562     if msg:
6563       self.LogWarning("Could not finalize export for instance %s"
6564                       " on node %s: %s", instance.name, dst_node.name, msg)
6565
6566     nodelist = self.cfg.GetNodeList()
6567     nodelist.remove(dst_node.name)
6568
6569     # on one-node clusters nodelist will be empty after the removal
6570     # if we proceed the backup would be removed because OpQueryExports
6571     # substitutes an empty list with the full cluster node list.
6572     iname = instance.name
6573     if nodelist:
6574       exportlist = self.rpc.call_export_list(nodelist)
6575       for node in exportlist:
6576         if exportlist[node].RemoteFailMsg():
6577           continue
6578         if iname in exportlist[node].payload:
6579           msg = self.rpc.call_export_remove(node, iname).RemoteFailMsg()
6580           if msg:
6581             self.LogWarning("Could not remove older export for instance %s"
6582                             " on node %s: %s", iname, node, msg)
6583
6584
6585 class LURemoveExport(NoHooksLU):
6586   """Remove exports related to the named instance.
6587
6588   """
6589   _OP_REQP = ["instance_name"]
6590   REQ_BGL = False
6591
6592   def ExpandNames(self):
6593     self.needed_locks = {}
6594     # We need all nodes to be locked in order for RemoveExport to work, but we
6595     # don't need to lock the instance itself, as nothing will happen to it (and
6596     # we can remove exports also for a removed instance)
6597     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6598
6599   def CheckPrereq(self):
6600     """Check prerequisites.
6601     """
6602     pass
6603
6604   def Exec(self, feedback_fn):
6605     """Remove any export.
6606
6607     """
6608     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6609     # If the instance was not found we'll try with the name that was passed in.
6610     # This will only work if it was an FQDN, though.
6611     fqdn_warn = False
6612     if not instance_name:
6613       fqdn_warn = True
6614       instance_name = self.op.instance_name
6615
6616     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6617     exportlist = self.rpc.call_export_list(locked_nodes)
6618     found = False
6619     for node in exportlist:
6620       msg = exportlist[node].RemoteFailMsg()
6621       if msg:
6622         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6623         continue
6624       if instance_name in exportlist[node].payload:
6625         found = True
6626         result = self.rpc.call_export_remove(node, instance_name)
6627         msg = result.RemoteFailMsg()
6628         if msg:
6629           logging.error("Could not remove export for instance %s"
6630                         " on node %s: %s", instance_name, node, msg)
6631
6632     if fqdn_warn and not found:
6633       feedback_fn("Export not found. If trying to remove an export belonging"
6634                   " to a deleted instance please use its Fully Qualified"
6635                   " Domain Name.")
6636
6637
6638 class TagsLU(NoHooksLU):
6639   """Generic tags LU.
6640
6641   This is an abstract class which is the parent of all the other tags LUs.
6642
6643   """
6644
6645   def ExpandNames(self):
6646     self.needed_locks = {}
6647     if self.op.kind == constants.TAG_NODE:
6648       name = self.cfg.ExpandNodeName(self.op.name)
6649       if name is None:
6650         raise errors.OpPrereqError("Invalid node name (%s)" %
6651                                    (self.op.name,))
6652       self.op.name = name
6653       self.needed_locks[locking.LEVEL_NODE] = name
6654     elif self.op.kind == constants.TAG_INSTANCE:
6655       name = self.cfg.ExpandInstanceName(self.op.name)
6656       if name is None:
6657         raise errors.OpPrereqError("Invalid instance name (%s)" %
6658                                    (self.op.name,))
6659       self.op.name = name
6660       self.needed_locks[locking.LEVEL_INSTANCE] = name
6661
6662   def CheckPrereq(self):
6663     """Check prerequisites.
6664
6665     """
6666     if self.op.kind == constants.TAG_CLUSTER:
6667       self.target = self.cfg.GetClusterInfo()
6668     elif self.op.kind == constants.TAG_NODE:
6669       self.target = self.cfg.GetNodeInfo(self.op.name)
6670     elif self.op.kind == constants.TAG_INSTANCE:
6671       self.target = self.cfg.GetInstanceInfo(self.op.name)
6672     else:
6673       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6674                                  str(self.op.kind))
6675
6676
6677 class LUGetTags(TagsLU):
6678   """Returns the tags of a given object.
6679
6680   """
6681   _OP_REQP = ["kind", "name"]
6682   REQ_BGL = False
6683
6684   def Exec(self, feedback_fn):
6685     """Returns the tag list.
6686
6687     """
6688     return list(self.target.GetTags())
6689
6690
6691 class LUSearchTags(NoHooksLU):
6692   """Searches the tags for a given pattern.
6693
6694   """
6695   _OP_REQP = ["pattern"]
6696   REQ_BGL = False
6697
6698   def ExpandNames(self):
6699     self.needed_locks = {}
6700
6701   def CheckPrereq(self):
6702     """Check prerequisites.
6703
6704     This checks the pattern passed for validity by compiling it.
6705
6706     """
6707     try:
6708       self.re = re.compile(self.op.pattern)
6709     except re.error, err:
6710       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6711                                  (self.op.pattern, err))
6712
6713   def Exec(self, feedback_fn):
6714     """Returns the tag list.
6715
6716     """
6717     cfg = self.cfg
6718     tgts = [("/cluster", cfg.GetClusterInfo())]
6719     ilist = cfg.GetAllInstancesInfo().values()
6720     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6721     nlist = cfg.GetAllNodesInfo().values()
6722     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6723     results = []
6724     for path, target in tgts:
6725       for tag in target.GetTags():
6726         if self.re.search(tag):
6727           results.append((path, tag))
6728     return results
6729
6730
6731 class LUAddTags(TagsLU):
6732   """Sets a tag on a given object.
6733
6734   """
6735   _OP_REQP = ["kind", "name", "tags"]
6736   REQ_BGL = False
6737
6738   def CheckPrereq(self):
6739     """Check prerequisites.
6740
6741     This checks the type and length of the tag name and value.
6742
6743     """
6744     TagsLU.CheckPrereq(self)
6745     for tag in self.op.tags:
6746       objects.TaggableObject.ValidateTag(tag)
6747
6748   def Exec(self, feedback_fn):
6749     """Sets the tag.
6750
6751     """
6752     try:
6753       for tag in self.op.tags:
6754         self.target.AddTag(tag)
6755     except errors.TagError, err:
6756       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6757     try:
6758       self.cfg.Update(self.target)
6759     except errors.ConfigurationError:
6760       raise errors.OpRetryError("There has been a modification to the"
6761                                 " config file and the operation has been"
6762                                 " aborted. Please retry.")
6763
6764
6765 class LUDelTags(TagsLU):
6766   """Delete a list of tags from a given object.
6767
6768   """
6769   _OP_REQP = ["kind", "name", "tags"]
6770   REQ_BGL = False
6771
6772   def CheckPrereq(self):
6773     """Check prerequisites.
6774
6775     This checks that we have the given tag.
6776
6777     """
6778     TagsLU.CheckPrereq(self)
6779     for tag in self.op.tags:
6780       objects.TaggableObject.ValidateTag(tag)
6781     del_tags = frozenset(self.op.tags)
6782     cur_tags = self.target.GetTags()
6783     if not del_tags <= cur_tags:
6784       diff_tags = del_tags - cur_tags
6785       diff_names = ["'%s'" % tag for tag in diff_tags]
6786       diff_names.sort()
6787       raise errors.OpPrereqError("Tag(s) %s not found" %
6788                                  (",".join(diff_names)))
6789
6790   def Exec(self, feedback_fn):
6791     """Remove the tag from the object.
6792
6793     """
6794     for tag in self.op.tags:
6795       self.target.RemoveTag(tag)
6796     try:
6797       self.cfg.Update(self.target)
6798     except errors.ConfigurationError:
6799       raise errors.OpRetryError("There has been a modification to the"
6800                                 " config file and the operation has been"
6801                                 " aborted. Please retry.")
6802
6803
6804 class LUTestDelay(NoHooksLU):
6805   """Sleep for a specified amount of time.
6806
6807   This LU sleeps on the master and/or nodes for a specified amount of
6808   time.
6809
6810   """
6811   _OP_REQP = ["duration", "on_master", "on_nodes"]
6812   REQ_BGL = False
6813
6814   def ExpandNames(self):
6815     """Expand names and set required locks.
6816
6817     This expands the node list, if any.
6818
6819     """
6820     self.needed_locks = {}
6821     if self.op.on_nodes:
6822       # _GetWantedNodes can be used here, but is not always appropriate to use
6823       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6824       # more information.
6825       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6826       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6827
6828   def CheckPrereq(self):
6829     """Check prerequisites.
6830
6831     """
6832
6833   def Exec(self, feedback_fn):
6834     """Do the actual sleep.
6835
6836     """
6837     if self.op.on_master:
6838       if not utils.TestDelay(self.op.duration):
6839         raise errors.OpExecError("Error during master delay test")
6840     if self.op.on_nodes:
6841       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6842       for node, node_result in result.items():
6843         msg = node_result.RemoteFailMsg()
6844         if msg:
6845           raise errors.OpExecError("Failure during rpc call to node %s: %s"
6846                                    % (node, msg))
6847
6848
6849 class IAllocator(object):
6850   """IAllocator framework.
6851
6852   An IAllocator instance has three sets of attributes:
6853     - cfg that is needed to query the cluster
6854     - input data (all members of the _KEYS class attribute are required)
6855     - four buffer attributes (in|out_data|text), that represent the
6856       input (to the external script) in text and data structure format,
6857       and the output from it, again in two formats
6858     - the result variables from the script (success, info, nodes) for
6859       easy usage
6860
6861   """
6862   _ALLO_KEYS = [
6863     "mem_size", "disks", "disk_template",
6864     "os", "tags", "nics", "vcpus", "hypervisor",
6865     ]
6866   _RELO_KEYS = [
6867     "relocate_from",
6868     ]
6869
6870   def __init__(self, lu, mode, name, **kwargs):
6871     self.lu = lu
6872     # init buffer variables
6873     self.in_text = self.out_text = self.in_data = self.out_data = None
6874     # init all input fields so that pylint is happy
6875     self.mode = mode
6876     self.name = name
6877     self.mem_size = self.disks = self.disk_template = None
6878     self.os = self.tags = self.nics = self.vcpus = None
6879     self.hypervisor = None
6880     self.relocate_from = None
6881     # computed fields
6882     self.required_nodes = None
6883     # init result fields
6884     self.success = self.info = self.nodes = None
6885     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6886       keyset = self._ALLO_KEYS
6887     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6888       keyset = self._RELO_KEYS
6889     else:
6890       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6891                                    " IAllocator" % self.mode)
6892     for key in kwargs:
6893       if key not in keyset:
6894         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6895                                      " IAllocator" % key)
6896       setattr(self, key, kwargs[key])
6897     for key in keyset:
6898       if key not in kwargs:
6899         raise errors.ProgrammerError("Missing input parameter '%s' to"
6900                                      " IAllocator" % key)
6901     self._BuildInputData()
6902
6903   def _ComputeClusterData(self):
6904     """Compute the generic allocator input data.
6905
6906     This is the data that is independent of the actual operation.
6907
6908     """
6909     cfg = self.lu.cfg
6910     cluster_info = cfg.GetClusterInfo()
6911     # cluster data
6912     data = {
6913       "version": constants.IALLOCATOR_VERSION,
6914       "cluster_name": cfg.GetClusterName(),
6915       "cluster_tags": list(cluster_info.GetTags()),
6916       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6917       # we don't have job IDs
6918       }
6919     iinfo = cfg.GetAllInstancesInfo().values()
6920     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6921
6922     # node data
6923     node_results = {}
6924     node_list = cfg.GetNodeList()
6925
6926     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6927       hypervisor_name = self.hypervisor
6928     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6929       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6930
6931     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6932                                            hypervisor_name)
6933     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6934                        cluster_info.enabled_hypervisors)
6935     for nname, nresult in node_data.items():
6936       # first fill in static (config-based) values
6937       ninfo = cfg.GetNodeInfo(nname)
6938       pnr = {
6939         "tags": list(ninfo.GetTags()),
6940         "primary_ip": ninfo.primary_ip,
6941         "secondary_ip": ninfo.secondary_ip,
6942         "offline": ninfo.offline,
6943         "drained": ninfo.drained,
6944         "master_candidate": ninfo.master_candidate,
6945         }
6946
6947       if not ninfo.offline:
6948         msg = nresult.RemoteFailMsg()
6949         if msg:
6950           raise errors.OpExecError("Can't get data for node %s: %s" %
6951                                    (nname, msg))
6952         msg = node_iinfo[nname].RemoteFailMsg()
6953         if msg:
6954           raise errors.OpExecError("Can't get node instance info"
6955                                    " from node %s: %s" % (nname, msg))
6956         remote_info = nresult.payload
6957         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6958                      'vg_size', 'vg_free', 'cpu_total']:
6959           if attr not in remote_info:
6960             raise errors.OpExecError("Node '%s' didn't return attribute"
6961                                      " '%s'" % (nname, attr))
6962           if not isinstance(remote_info[attr], int):
6963             raise errors.OpExecError("Node '%s' returned invalid value"
6964                                      " for '%s': %s" %
6965                                      (nname, attr, remote_info[attr]))
6966         # compute memory used by primary instances
6967         i_p_mem = i_p_up_mem = 0
6968         for iinfo, beinfo in i_list:
6969           if iinfo.primary_node == nname:
6970             i_p_mem += beinfo[constants.BE_MEMORY]
6971             if iinfo.name not in node_iinfo[nname].payload:
6972               i_used_mem = 0
6973             else:
6974               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6975             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6976             remote_info['memory_free'] -= max(0, i_mem_diff)
6977
6978             if iinfo.admin_up:
6979               i_p_up_mem += beinfo[constants.BE_MEMORY]
6980
6981         # compute memory used by instances
6982         pnr_dyn = {
6983           "total_memory": remote_info['memory_total'],
6984           "reserved_memory": remote_info['memory_dom0'],
6985           "free_memory": remote_info['memory_free'],
6986           "total_disk": remote_info['vg_size'],
6987           "free_disk": remote_info['vg_free'],
6988           "total_cpus": remote_info['cpu_total'],
6989           "i_pri_memory": i_p_mem,
6990           "i_pri_up_memory": i_p_up_mem,
6991           }
6992         pnr.update(pnr_dyn)
6993
6994       node_results[nname] = pnr
6995     data["nodes"] = node_results
6996
6997     # instance data
6998     instance_data = {}
6999     for iinfo, beinfo in i_list:
7000       nic_data = []
7001       for nic in iinfo.nics:
7002         filled_params = objects.FillDict(
7003             cluster_info.nicparams[constants.PP_DEFAULT],
7004             nic.nicparams)
7005         nic_dict = {"mac": nic.mac,
7006                     "ip": nic.ip,
7007                     "mode": filled_params[constants.NIC_MODE],
7008                     "link": filled_params[constants.NIC_LINK],
7009                    }
7010         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7011           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7012         nic_data.append(nic_dict)
7013       pir = {
7014         "tags": list(iinfo.GetTags()),
7015         "admin_up": iinfo.admin_up,
7016         "vcpus": beinfo[constants.BE_VCPUS],
7017         "memory": beinfo[constants.BE_MEMORY],
7018         "os": iinfo.os,
7019         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7020         "nics": nic_data,
7021         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7022         "disk_template": iinfo.disk_template,
7023         "hypervisor": iinfo.hypervisor,
7024         }
7025       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7026                                                  pir["disks"])
7027       instance_data[iinfo.name] = pir
7028
7029     data["instances"] = instance_data
7030
7031     self.in_data = data
7032
7033   def _AddNewInstance(self):
7034     """Add new instance data to allocator structure.
7035
7036     This in combination with _AllocatorGetClusterData will create the
7037     correct structure needed as input for the allocator.
7038
7039     The checks for the completeness of the opcode must have already been
7040     done.
7041
7042     """
7043     data = self.in_data
7044
7045     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7046
7047     if self.disk_template in constants.DTS_NET_MIRROR:
7048       self.required_nodes = 2
7049     else:
7050       self.required_nodes = 1
7051     request = {
7052       "type": "allocate",
7053       "name": self.name,
7054       "disk_template": self.disk_template,
7055       "tags": self.tags,
7056       "os": self.os,
7057       "vcpus": self.vcpus,
7058       "memory": self.mem_size,
7059       "disks": self.disks,
7060       "disk_space_total": disk_space,
7061       "nics": self.nics,
7062       "required_nodes": self.required_nodes,
7063       }
7064     data["request"] = request
7065
7066   def _AddRelocateInstance(self):
7067     """Add relocate instance data to allocator structure.
7068
7069     This in combination with _IAllocatorGetClusterData will create the
7070     correct structure needed as input for the allocator.
7071
7072     The checks for the completeness of the opcode must have already been
7073     done.
7074
7075     """
7076     instance = self.lu.cfg.GetInstanceInfo(self.name)
7077     if instance is None:
7078       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7079                                    " IAllocator" % self.name)
7080
7081     if instance.disk_template not in constants.DTS_NET_MIRROR:
7082       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7083
7084     if len(instance.secondary_nodes) != 1:
7085       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7086
7087     self.required_nodes = 1
7088     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7089     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7090
7091     request = {
7092       "type": "relocate",
7093       "name": self.name,
7094       "disk_space_total": disk_space,
7095       "required_nodes": self.required_nodes,
7096       "relocate_from": self.relocate_from,
7097       }
7098     self.in_data["request"] = request
7099
7100   def _BuildInputData(self):
7101     """Build input data structures.
7102
7103     """
7104     self._ComputeClusterData()
7105
7106     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7107       self._AddNewInstance()
7108     else:
7109       self._AddRelocateInstance()
7110
7111     self.in_text = serializer.Dump(self.in_data)
7112
7113   def Run(self, name, validate=True, call_fn=None):
7114     """Run an instance allocator and return the results.
7115
7116     """
7117     if call_fn is None:
7118       call_fn = self.lu.rpc.call_iallocator_runner
7119     data = self.in_text
7120
7121     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7122     msg = result.RemoteFailMsg()
7123     if msg:
7124       raise errors.OpExecError("Failure while running the iallocator"
7125                                " script: %s" % msg)
7126
7127     self.out_text = result.payload
7128     if validate:
7129       self._ValidateResult()
7130
7131   def _ValidateResult(self):
7132     """Process the allocator results.
7133
7134     This will process and if successful save the result in
7135     self.out_data and the other parameters.
7136
7137     """
7138     try:
7139       rdict = serializer.Load(self.out_text)
7140     except Exception, err:
7141       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7142
7143     if not isinstance(rdict, dict):
7144       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7145
7146     for key in "success", "info", "nodes":
7147       if key not in rdict:
7148         raise errors.OpExecError("Can't parse iallocator results:"
7149                                  " missing key '%s'" % key)
7150       setattr(self, key, rdict[key])
7151
7152     if not isinstance(rdict["nodes"], list):
7153       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7154                                " is not a list")
7155     self.out_data = rdict
7156
7157
7158 class LUTestAllocator(NoHooksLU):
7159   """Run allocator tests.
7160
7161   This LU runs the allocator tests
7162
7163   """
7164   _OP_REQP = ["direction", "mode", "name"]
7165
7166   def CheckPrereq(self):
7167     """Check prerequisites.
7168
7169     This checks the opcode parameters depending on the director and mode test.
7170
7171     """
7172     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7173       for attr in ["name", "mem_size", "disks", "disk_template",
7174                    "os", "tags", "nics", "vcpus"]:
7175         if not hasattr(self.op, attr):
7176           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7177                                      attr)
7178       iname = self.cfg.ExpandInstanceName(self.op.name)
7179       if iname is not None:
7180         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7181                                    iname)
7182       if not isinstance(self.op.nics, list):
7183         raise errors.OpPrereqError("Invalid parameter 'nics'")
7184       for row in self.op.nics:
7185         if (not isinstance(row, dict) or
7186             "mac" not in row or
7187             "ip" not in row or
7188             "bridge" not in row):
7189           raise errors.OpPrereqError("Invalid contents of the"
7190                                      " 'nics' parameter")
7191       if not isinstance(self.op.disks, list):
7192         raise errors.OpPrereqError("Invalid parameter 'disks'")
7193       for row in self.op.disks:
7194         if (not isinstance(row, dict) or
7195             "size" not in row or
7196             not isinstance(row["size"], int) or
7197             "mode" not in row or
7198             row["mode"] not in ['r', 'w']):
7199           raise errors.OpPrereqError("Invalid contents of the"
7200                                      " 'disks' parameter")
7201       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7202         self.op.hypervisor = self.cfg.GetHypervisorType()
7203     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7204       if not hasattr(self.op, "name"):
7205         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7206       fname = self.cfg.ExpandInstanceName(self.op.name)
7207       if fname is None:
7208         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7209                                    self.op.name)
7210       self.op.name = fname
7211       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7212     else:
7213       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7214                                  self.op.mode)
7215
7216     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7217       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7218         raise errors.OpPrereqError("Missing allocator name")
7219     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7220       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7221                                  self.op.direction)
7222
7223   def Exec(self, feedback_fn):
7224     """Run the allocator test.
7225
7226     """
7227     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7228       ial = IAllocator(self,
7229                        mode=self.op.mode,
7230                        name=self.op.name,
7231                        mem_size=self.op.mem_size,
7232                        disks=self.op.disks,
7233                        disk_template=self.op.disk_template,
7234                        os=self.op.os,
7235                        tags=self.op.tags,
7236                        nics=self.op.nics,
7237                        vcpus=self.op.vcpus,
7238                        hypervisor=self.op.hypervisor,
7239                        )
7240     else:
7241       ial = IAllocator(self,
7242                        mode=self.op.mode,
7243                        name=self.op.name,
7244                        relocate_from=list(self.relocate_from),
7245                        )
7246
7247     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7248       result = ial.in_text
7249     else:
7250       ial.Run(self.op.allocator, validate=False)
7251       result = ial.out_text
7252     return result