Convert node_start_master to new style result
[ganeti-local] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable-msg=W0613,W0201
25
26 import os
27 import os.path
28 import time
29 import tempfile
30 import re
31 import platform
32 import logging
33 import copy
34 import random
35
36 from ganeti import ssh
37 from ganeti import utils
38 from ganeti import errors
39 from ganeti import hypervisor
40 from ganeti import locking
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import serializer
45 from ganeti import ssconf
46
47
48 class LogicalUnit(object):
49   """Logical Unit base class.
50
51   Subclasses must follow these rules:
52     - implement ExpandNames
53     - implement CheckPrereq
54     - implement Exec
55     - implement BuildHooksEnv
56     - redefine HPATH and HTYPE
57     - optionally redefine their run requirements:
58         REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
59
60   Note that all commands require root permissions.
61
62   """
63   HPATH = None
64   HTYPE = None
65   _OP_REQP = []
66   REQ_BGL = True
67
68   def __init__(self, processor, op, context, rpc):
69     """Constructor for LogicalUnit.
70
71     This needs to be overriden in derived classes in order to check op
72     validity.
73
74     """
75     self.proc = processor
76     self.op = op
77     self.cfg = context.cfg
78     self.context = context
79     self.rpc = rpc
80     # Dicts used to declare locking needs to mcpu
81     self.needed_locks = None
82     self.acquired_locks = {}
83     self.share_locks = dict(((i, 0) for i in locking.LEVELS))
84     self.add_locks = {}
85     self.remove_locks = {}
86     # Used to force good behavior when calling helper functions
87     self.recalculate_locks = {}
88     self.__ssh = None
89     # logging
90     self.LogWarning = processor.LogWarning
91     self.LogInfo = processor.LogInfo
92
93     for attr_name in self._OP_REQP:
94       attr_val = getattr(op, attr_name, None)
95       if attr_val is None:
96         raise errors.OpPrereqError("Required parameter '%s' missing" %
97                                    attr_name)
98     self.CheckArguments()
99
100   def __GetSSH(self):
101     """Returns the SshRunner object
102
103     """
104     if not self.__ssh:
105       self.__ssh = ssh.SshRunner(self.cfg.GetClusterName())
106     return self.__ssh
107
108   ssh = property(fget=__GetSSH)
109
110   def CheckArguments(self):
111     """Check syntactic validity for the opcode arguments.
112
113     This method is for doing a simple syntactic check and ensure
114     validity of opcode parameters, without any cluster-related
115     checks. While the same can be accomplished in ExpandNames and/or
116     CheckPrereq, doing these separate is better because:
117
118       - ExpandNames is left as as purely a lock-related function
119       - CheckPrereq is run after we have aquired locks (and possible
120         waited for them)
121
122     The function is allowed to change the self.op attribute so that
123     later methods can no longer worry about missing parameters.
124
125     """
126     pass
127
128   def ExpandNames(self):
129     """Expand names for this LU.
130
131     This method is called before starting to execute the opcode, and it should
132     update all the parameters of the opcode to their canonical form (e.g. a
133     short node name must be fully expanded after this method has successfully
134     completed). This way locking, hooks, logging, ecc. can work correctly.
135
136     LUs which implement this method must also populate the self.needed_locks
137     member, as a dict with lock levels as keys, and a list of needed lock names
138     as values. Rules:
139
140       - use an empty dict if you don't need any lock
141       - if you don't need any lock at a particular level omit that level
142       - don't put anything for the BGL level
143       - if you want all locks at a level use locking.ALL_SET as a value
144
145     If you need to share locks (rather than acquire them exclusively) at one
146     level you can modify self.share_locks, setting a true value (usually 1) for
147     that level. By default locks are not shared.
148
149     Examples::
150
151       # Acquire all nodes and one instance
152       self.needed_locks = {
153         locking.LEVEL_NODE: locking.ALL_SET,
154         locking.LEVEL_INSTANCE: ['instance1.example.tld'],
155       }
156       # Acquire just two nodes
157       self.needed_locks = {
158         locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
159       }
160       # Acquire no locks
161       self.needed_locks = {} # No, you can't leave it to the default value None
162
163     """
164     # The implementation of this method is mandatory only if the new LU is
165     # concurrent, so that old LUs don't need to be changed all at the same
166     # time.
167     if self.REQ_BGL:
168       self.needed_locks = {} # Exclusive LUs don't need locks.
169     else:
170       raise NotImplementedError
171
172   def DeclareLocks(self, level):
173     """Declare LU locking needs for a level
174
175     While most LUs can just declare their locking needs at ExpandNames time,
176     sometimes there's the need to calculate some locks after having acquired
177     the ones before. This function is called just before acquiring locks at a
178     particular level, but after acquiring the ones at lower levels, and permits
179     such calculations. It can be used to modify self.needed_locks, and by
180     default it does nothing.
181
182     This function is only called if you have something already set in
183     self.needed_locks for the level.
184
185     @param level: Locking level which is going to be locked
186     @type level: member of ganeti.locking.LEVELS
187
188     """
189
190   def CheckPrereq(self):
191     """Check prerequisites for this LU.
192
193     This method should check that the prerequisites for the execution
194     of this LU are fulfilled. It can do internode communication, but
195     it should be idempotent - no cluster or system changes are
196     allowed.
197
198     The method should raise errors.OpPrereqError in case something is
199     not fulfilled. Its return value is ignored.
200
201     This method should also update all the parameters of the opcode to
202     their canonical form if it hasn't been done by ExpandNames before.
203
204     """
205     raise NotImplementedError
206
207   def Exec(self, feedback_fn):
208     """Execute the LU.
209
210     This method should implement the actual work. It should raise
211     errors.OpExecError for failures that are somewhat dealt with in
212     code, or expected.
213
214     """
215     raise NotImplementedError
216
217   def BuildHooksEnv(self):
218     """Build hooks environment for this LU.
219
220     This method should return a three-node tuple consisting of: a dict
221     containing the environment that will be used for running the
222     specific hook for this LU, a list of node names on which the hook
223     should run before the execution, and a list of node names on which
224     the hook should run after the execution.
225
226     The keys of the dict must not have 'GANETI_' prefixed as this will
227     be handled in the hooks runner. Also note additional keys will be
228     added by the hooks runner. If the LU doesn't define any
229     environment, an empty dict (and not None) should be returned.
230
231     No nodes should be returned as an empty list (and not None).
232
233     Note that if the HPATH for a LU class is None, this function will
234     not be called.
235
236     """
237     raise NotImplementedError
238
239   def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
240     """Notify the LU about the results of its hooks.
241
242     This method is called every time a hooks phase is executed, and notifies
243     the Logical Unit about the hooks' result. The LU can then use it to alter
244     its result based on the hooks.  By default the method does nothing and the
245     previous result is passed back unchanged but any LU can define it if it
246     wants to use the local cluster hook-scripts somehow.
247
248     @param phase: one of L{constants.HOOKS_PHASE_POST} or
249         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
250     @param hook_results: the results of the multi-node hooks rpc call
251     @param feedback_fn: function used send feedback back to the caller
252     @param lu_result: the previous Exec result this LU had, or None
253         in the PRE phase
254     @return: the new Exec result, based on the previous result
255         and hook results
256
257     """
258     return lu_result
259
260   def _ExpandAndLockInstance(self):
261     """Helper function to expand and lock an instance.
262
263     Many LUs that work on an instance take its name in self.op.instance_name
264     and need to expand it and then declare the expanded name for locking. This
265     function does it, and then updates self.op.instance_name to the expanded
266     name. It also initializes needed_locks as a dict, if this hasn't been done
267     before.
268
269     """
270     if self.needed_locks is None:
271       self.needed_locks = {}
272     else:
273       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
274         "_ExpandAndLockInstance called with instance-level locks set"
275     expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
276     if expanded_name is None:
277       raise errors.OpPrereqError("Instance '%s' not known" %
278                                   self.op.instance_name)
279     self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
280     self.op.instance_name = expanded_name
281
282   def _LockInstancesNodes(self, primary_only=False):
283     """Helper function to declare instances' nodes for locking.
284
285     This function should be called after locking one or more instances to lock
286     their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
287     with all primary or secondary nodes for instances already locked and
288     present in self.needed_locks[locking.LEVEL_INSTANCE].
289
290     It should be called from DeclareLocks, and for safety only works if
291     self.recalculate_locks[locking.LEVEL_NODE] is set.
292
293     In the future it may grow parameters to just lock some instance's nodes, or
294     to just lock primaries or secondary nodes, if needed.
295
296     If should be called in DeclareLocks in a way similar to::
297
298       if level == locking.LEVEL_NODE:
299         self._LockInstancesNodes()
300
301     @type primary_only: boolean
302     @param primary_only: only lock primary nodes of locked instances
303
304     """
305     assert locking.LEVEL_NODE in self.recalculate_locks, \
306       "_LockInstancesNodes helper function called with no nodes to recalculate"
307
308     # TODO: check if we're really been called with the instance locks held
309
310     # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
311     # future we might want to have different behaviors depending on the value
312     # of self.recalculate_locks[locking.LEVEL_NODE]
313     wanted_nodes = []
314     for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
315       instance = self.context.cfg.GetInstanceInfo(instance_name)
316       wanted_nodes.append(instance.primary_node)
317       if not primary_only:
318         wanted_nodes.extend(instance.secondary_nodes)
319
320     if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
321       self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
322     elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
323       self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
324
325     del self.recalculate_locks[locking.LEVEL_NODE]
326
327
328 class NoHooksLU(LogicalUnit):
329   """Simple LU which runs no hooks.
330
331   This LU is intended as a parent for other LogicalUnits which will
332   run no hooks, in order to reduce duplicate code.
333
334   """
335   HPATH = None
336   HTYPE = None
337
338
339 def _GetWantedNodes(lu, nodes):
340   """Returns list of checked and expanded node names.
341
342   @type lu: L{LogicalUnit}
343   @param lu: the logical unit on whose behalf we execute
344   @type nodes: list
345   @param nodes: list of node names or None for all nodes
346   @rtype: list
347   @return: the list of nodes, sorted
348   @raise errors.OpProgrammerError: if the nodes parameter is wrong type
349
350   """
351   if not isinstance(nodes, list):
352     raise errors.OpPrereqError("Invalid argument type 'nodes'")
353
354   if not nodes:
355     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
356       " non-empty list of nodes whose name is to be expanded.")
357
358   wanted = []
359   for name in nodes:
360     node = lu.cfg.ExpandNodeName(name)
361     if node is None:
362       raise errors.OpPrereqError("No such node name '%s'" % name)
363     wanted.append(node)
364
365   return utils.NiceSort(wanted)
366
367
368 def _GetWantedInstances(lu, instances):
369   """Returns list of checked and expanded instance names.
370
371   @type lu: L{LogicalUnit}
372   @param lu: the logical unit on whose behalf we execute
373   @type instances: list
374   @param instances: list of instance names or None for all instances
375   @rtype: list
376   @return: the list of instances, sorted
377   @raise errors.OpPrereqError: if the instances parameter is wrong type
378   @raise errors.OpPrereqError: if any of the passed instances is not found
379
380   """
381   if not isinstance(instances, list):
382     raise errors.OpPrereqError("Invalid argument type 'instances'")
383
384   if instances:
385     wanted = []
386
387     for name in instances:
388       instance = lu.cfg.ExpandInstanceName(name)
389       if instance is None:
390         raise errors.OpPrereqError("No such instance name '%s'" % name)
391       wanted.append(instance)
392
393   else:
394     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
395   return wanted
396
397
398 def _CheckOutputFields(static, dynamic, selected):
399   """Checks whether all selected fields are valid.
400
401   @type static: L{utils.FieldSet}
402   @param static: static fields set
403   @type dynamic: L{utils.FieldSet}
404   @param dynamic: dynamic fields set
405
406   """
407   f = utils.FieldSet()
408   f.Extend(static)
409   f.Extend(dynamic)
410
411   delta = f.NonMatching(selected)
412   if delta:
413     raise errors.OpPrereqError("Unknown output fields selected: %s"
414                                % ",".join(delta))
415
416
417 def _CheckBooleanOpField(op, name):
418   """Validates boolean opcode parameters.
419
420   This will ensure that an opcode parameter is either a boolean value,
421   or None (but that it always exists).
422
423   """
424   val = getattr(op, name, None)
425   if not (val is None or isinstance(val, bool)):
426     raise errors.OpPrereqError("Invalid boolean parameter '%s' (%s)" %
427                                (name, str(val)))
428   setattr(op, name, val)
429
430
431 def _CheckNodeOnline(lu, node):
432   """Ensure that a given node is online.
433
434   @param lu: the LU on behalf of which we make the check
435   @param node: the node to check
436   @raise errors.OpPrereqError: if the node is offline
437
438   """
439   if lu.cfg.GetNodeInfo(node).offline:
440     raise errors.OpPrereqError("Can't use offline node %s" % node)
441
442
443 def _CheckNodeNotDrained(lu, node):
444   """Ensure that a given node is not drained.
445
446   @param lu: the LU on behalf of which we make the check
447   @param node: the node to check
448   @raise errors.OpPrereqError: if the node is drained
449
450   """
451   if lu.cfg.GetNodeInfo(node).drained:
452     raise errors.OpPrereqError("Can't use drained node %s" % node)
453
454
455 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
456                           memory, vcpus, nics, disk_template, disks):
457   """Builds instance related env variables for hooks
458
459   This builds the hook environment from individual variables.
460
461   @type name: string
462   @param name: the name of the instance
463   @type primary_node: string
464   @param primary_node: the name of the instance's primary node
465   @type secondary_nodes: list
466   @param secondary_nodes: list of secondary nodes as strings
467   @type os_type: string
468   @param os_type: the name of the instance's OS
469   @type status: boolean
470   @param status: the should_run status of the instance
471   @type memory: string
472   @param memory: the memory size of the instance
473   @type vcpus: string
474   @param vcpus: the count of VCPUs the instance has
475   @type nics: list
476   @param nics: list of tuples (ip, bridge, mac) representing
477       the NICs the instance  has
478   @type disk_template: string
479   @param disk_template: the distk template of the instance
480   @type disks: list
481   @param disks: the list of (size, mode) pairs
482   @rtype: dict
483   @return: the hook environment for this instance
484
485   """
486   if status:
487     str_status = "up"
488   else:
489     str_status = "down"
490   env = {
491     "OP_TARGET": name,
492     "INSTANCE_NAME": name,
493     "INSTANCE_PRIMARY": primary_node,
494     "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
495     "INSTANCE_OS_TYPE": os_type,
496     "INSTANCE_STATUS": str_status,
497     "INSTANCE_MEMORY": memory,
498     "INSTANCE_VCPUS": vcpus,
499     "INSTANCE_DISK_TEMPLATE": disk_template,
500   }
501
502   if nics:
503     nic_count = len(nics)
504     for idx, (ip, mac, mode, link) in enumerate(nics):
505       if ip is None:
506         ip = ""
507       env["INSTANCE_NIC%d_IP" % idx] = ip
508       env["INSTANCE_NIC%d_MAC" % idx] = mac
509       env["INSTANCE_NIC%d_MODE" % idx] = mode
510       env["INSTANCE_NIC%d_LINK" % idx] = link
511       if mode == constants.NIC_MODE_BRIDGED:
512         env["INSTANCE_NIC%d_BRIDGE" % idx] = link
513   else:
514     nic_count = 0
515
516   env["INSTANCE_NIC_COUNT"] = nic_count
517
518   if disks:
519     disk_count = len(disks)
520     for idx, (size, mode) in enumerate(disks):
521       env["INSTANCE_DISK%d_SIZE" % idx] = size
522       env["INSTANCE_DISK%d_MODE" % idx] = mode
523   else:
524     disk_count = 0
525
526   env["INSTANCE_DISK_COUNT"] = disk_count
527
528   return env
529
530 def _PreBuildNICHooksList(lu, nics):
531   """Build a list of nic information tuples.
532
533   This list is suitable to be passed to _BuildInstanceHookEnv.
534
535   @type lu:  L{LogicalUnit}
536   @param lu: the logical unit on whose behalf we execute
537   @type nics: list of L{objects.NIC}
538   @param nics: list of nics to convert to hooks tuples
539
540   """
541   hooks_nics = []
542   c_nicparams = lu.cfg.GetClusterInfo().nicparams[constants.PP_DEFAULT]
543   for nic in nics:
544     ip = nic.ip
545     mac = nic.mac
546     filled_params = objects.FillDict(c_nicparams, nic.nicparams)
547     mode = filled_params[constants.NIC_MODE]
548     link = filled_params[constants.NIC_LINK]
549     hooks_nics.append((ip, mac, mode, link))
550   return hooks_nics
551
552 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
553   """Builds instance related env variables for hooks from an object.
554
555   @type lu: L{LogicalUnit}
556   @param lu: the logical unit on whose behalf we execute
557   @type instance: L{objects.Instance}
558   @param instance: the instance for which we should build the
559       environment
560   @type override: dict
561   @param override: dictionary with key/values that will override
562       our values
563   @rtype: dict
564   @return: the hook environment dictionary
565
566   """
567   bep = lu.cfg.GetClusterInfo().FillBE(instance)
568   args = {
569     'name': instance.name,
570     'primary_node': instance.primary_node,
571     'secondary_nodes': instance.secondary_nodes,
572     'os_type': instance.os,
573     'status': instance.admin_up,
574     'memory': bep[constants.BE_MEMORY],
575     'vcpus': bep[constants.BE_VCPUS],
576     'nics': _PreBuildNICHooksList(lu, instance.nics),
577     'disk_template': instance.disk_template,
578     'disks': [(disk.size, disk.mode) for disk in instance.disks],
579   }
580   if override:
581     args.update(override)
582   return _BuildInstanceHookEnv(**args)
583
584
585 def _AdjustCandidatePool(lu):
586   """Adjust the candidate pool after node operations.
587
588   """
589   mod_list = lu.cfg.MaintainCandidatePool()
590   if mod_list:
591     lu.LogInfo("Promoted nodes to master candidate role: %s",
592                ", ".join(node.name for node in mod_list))
593     for name in mod_list:
594       lu.context.ReaddNode(name)
595   mc_now, mc_max = lu.cfg.GetMasterCandidateStats()
596   if mc_now > mc_max:
597     lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
598                (mc_now, mc_max))
599
600
601 def _CheckNicsBridgesExist(lu, target_nics, target_node,
602                                profile=constants.PP_DEFAULT):
603   """Check that the brigdes needed by a list of nics exist.
604
605   """
606   c_nicparams = lu.cfg.GetClusterInfo().nicparams[profile]
607   paramslist = [objects.FillDict(c_nicparams, nic.nicparams)
608                 for nic in target_nics]
609   brlist = [params[constants.NIC_LINK] for params in paramslist
610             if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
611   if brlist:
612     result = lu.rpc.call_bridges_exist(target_node, brlist)
613     msg = result.RemoteFailMsg()
614     if msg:
615       raise errors.OpPrereqError("Error checking bridges on destination node"
616                                  " '%s': %s" % (target_node, msg))
617
618
619 def _CheckInstanceBridgesExist(lu, instance, node=None):
620   """Check that the brigdes needed by an instance exist.
621
622   """
623   if node is None:
624     node=instance.primary_node
625   _CheckNicsBridgesExist(lu, instance.nics, node)
626
627
628 class LUDestroyCluster(NoHooksLU):
629   """Logical unit for destroying the cluster.
630
631   """
632   _OP_REQP = []
633
634   def CheckPrereq(self):
635     """Check prerequisites.
636
637     This checks whether the cluster is empty.
638
639     Any errors are signalled by raising errors.OpPrereqError.
640
641     """
642     master = self.cfg.GetMasterNode()
643
644     nodelist = self.cfg.GetNodeList()
645     if len(nodelist) != 1 or nodelist[0] != master:
646       raise errors.OpPrereqError("There are still %d node(s) in"
647                                  " this cluster." % (len(nodelist) - 1))
648     instancelist = self.cfg.GetInstanceList()
649     if instancelist:
650       raise errors.OpPrereqError("There are still %d instance(s) in"
651                                  " this cluster." % len(instancelist))
652
653   def Exec(self, feedback_fn):
654     """Destroys the cluster.
655
656     """
657     master = self.cfg.GetMasterNode()
658     result = self.rpc.call_node_stop_master(master, False)
659     result.Raise()
660     if not result.data:
661       raise errors.OpExecError("Could not disable the master role")
662     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
663     utils.CreateBackup(priv_key)
664     utils.CreateBackup(pub_key)
665     return master
666
667
668 class LUVerifyCluster(LogicalUnit):
669   """Verifies the cluster status.
670
671   """
672   HPATH = "cluster-verify"
673   HTYPE = constants.HTYPE_CLUSTER
674   _OP_REQP = ["skip_checks"]
675   REQ_BGL = False
676
677   def ExpandNames(self):
678     self.needed_locks = {
679       locking.LEVEL_NODE: locking.ALL_SET,
680       locking.LEVEL_INSTANCE: locking.ALL_SET,
681     }
682     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
683
684   def _VerifyNode(self, nodeinfo, file_list, local_cksum,
685                   node_result, feedback_fn, master_files,
686                   drbd_map, vg_name):
687     """Run multiple tests against a node.
688
689     Test list:
690
691       - compares ganeti version
692       - checks vg existance and size > 20G
693       - checks config file checksum
694       - checks ssh to other nodes
695
696     @type nodeinfo: L{objects.Node}
697     @param nodeinfo: the node to check
698     @param file_list: required list of files
699     @param local_cksum: dictionary of local files and their checksums
700     @param node_result: the results from the node
701     @param feedback_fn: function used to accumulate results
702     @param master_files: list of files that only masters should have
703     @param drbd_map: the useddrbd minors for this node, in
704         form of minor: (instance, must_exist) which correspond to instances
705         and their running status
706     @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
707
708     """
709     node = nodeinfo.name
710
711     # main result, node_result should be a non-empty dict
712     if not node_result or not isinstance(node_result, dict):
713       feedback_fn("  - ERROR: unable to verify node %s." % (node,))
714       return True
715
716     # compares ganeti version
717     local_version = constants.PROTOCOL_VERSION
718     remote_version = node_result.get('version', None)
719     if not (remote_version and isinstance(remote_version, (list, tuple)) and
720             len(remote_version) == 2):
721       feedback_fn("  - ERROR: connection to %s failed" % (node))
722       return True
723
724     if local_version != remote_version[0]:
725       feedback_fn("  - ERROR: incompatible protocol versions: master %s,"
726                   " node %s %s" % (local_version, node, remote_version[0]))
727       return True
728
729     # node seems compatible, we can actually try to look into its results
730
731     bad = False
732
733     # full package version
734     if constants.RELEASE_VERSION != remote_version[1]:
735       feedback_fn("  - WARNING: software version mismatch: master %s,"
736                   " node %s %s" %
737                   (constants.RELEASE_VERSION, node, remote_version[1]))
738
739     # checks vg existence and size > 20G
740     if vg_name is not None:
741       vglist = node_result.get(constants.NV_VGLIST, None)
742       if not vglist:
743         feedback_fn("  - ERROR: unable to check volume groups on node %s." %
744                         (node,))
745         bad = True
746       else:
747         vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
748                                               constants.MIN_VG_SIZE)
749         if vgstatus:
750           feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
751           bad = True
752
753     # checks config file checksum
754
755     remote_cksum = node_result.get(constants.NV_FILELIST, None)
756     if not isinstance(remote_cksum, dict):
757       bad = True
758       feedback_fn("  - ERROR: node hasn't returned file checksum data")
759     else:
760       for file_name in file_list:
761         node_is_mc = nodeinfo.master_candidate
762         must_have_file = file_name not in master_files
763         if file_name not in remote_cksum:
764           if node_is_mc or must_have_file:
765             bad = True
766             feedback_fn("  - ERROR: file '%s' missing" % file_name)
767         elif remote_cksum[file_name] != local_cksum[file_name]:
768           if node_is_mc or must_have_file:
769             bad = True
770             feedback_fn("  - ERROR: file '%s' has wrong checksum" % file_name)
771           else:
772             # not candidate and this is not a must-have file
773             bad = True
774             feedback_fn("  - ERROR: non master-candidate has old/wrong file"
775                         " '%s'" % file_name)
776         else:
777           # all good, except non-master/non-must have combination
778           if not node_is_mc and not must_have_file:
779             feedback_fn("  - ERROR: file '%s' should not exist on non master"
780                         " candidates" % file_name)
781
782     # checks ssh to any
783
784     if constants.NV_NODELIST not in node_result:
785       bad = True
786       feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
787     else:
788       if node_result[constants.NV_NODELIST]:
789         bad = True
790         for node in node_result[constants.NV_NODELIST]:
791           feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
792                           (node, node_result[constants.NV_NODELIST][node]))
793
794     if constants.NV_NODENETTEST not in node_result:
795       bad = True
796       feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
797     else:
798       if node_result[constants.NV_NODENETTEST]:
799         bad = True
800         nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
801         for node in nlist:
802           feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
803                           (node, node_result[constants.NV_NODENETTEST][node]))
804
805     hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
806     if isinstance(hyp_result, dict):
807       for hv_name, hv_result in hyp_result.iteritems():
808         if hv_result is not None:
809           feedback_fn("  - ERROR: hypervisor %s verify failure: '%s'" %
810                       (hv_name, hv_result))
811
812     # check used drbd list
813     if vg_name is not None:
814       used_minors = node_result.get(constants.NV_DRBDLIST, [])
815       if not isinstance(used_minors, (tuple, list)):
816         feedback_fn("  - ERROR: cannot parse drbd status file: %s" %
817                     str(used_minors))
818       else:
819         for minor, (iname, must_exist) in drbd_map.items():
820           if minor not in used_minors and must_exist:
821             feedback_fn("  - ERROR: drbd minor %d of instance %s is"
822                         " not active" % (minor, iname))
823             bad = True
824         for minor in used_minors:
825           if minor not in drbd_map:
826             feedback_fn("  - ERROR: unallocated drbd minor %d is in use" %
827                         minor)
828             bad = True
829
830     return bad
831
832   def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
833                       node_instance, feedback_fn, n_offline):
834     """Verify an instance.
835
836     This function checks to see if the required block devices are
837     available on the instance's node.
838
839     """
840     bad = False
841
842     node_current = instanceconfig.primary_node
843
844     node_vol_should = {}
845     instanceconfig.MapLVsByNode(node_vol_should)
846
847     for node in node_vol_should:
848       if node in n_offline:
849         # ignore missing volumes on offline nodes
850         continue
851       for volume in node_vol_should[node]:
852         if node not in node_vol_is or volume not in node_vol_is[node]:
853           feedback_fn("  - ERROR: volume %s missing on node %s" %
854                           (volume, node))
855           bad = True
856
857     if instanceconfig.admin_up:
858       if ((node_current not in node_instance or
859           not instance in node_instance[node_current]) and
860           node_current not in n_offline):
861         feedback_fn("  - ERROR: instance %s not running on node %s" %
862                         (instance, node_current))
863         bad = True
864
865     for node in node_instance:
866       if (not node == node_current):
867         if instance in node_instance[node]:
868           feedback_fn("  - ERROR: instance %s should not run on node %s" %
869                           (instance, node))
870           bad = True
871
872     return bad
873
874   def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is, feedback_fn):
875     """Verify if there are any unknown volumes in the cluster.
876
877     The .os, .swap and backup volumes are ignored. All other volumes are
878     reported as unknown.
879
880     """
881     bad = False
882
883     for node in node_vol_is:
884       for volume in node_vol_is[node]:
885         if node not in node_vol_should or volume not in node_vol_should[node]:
886           feedback_fn("  - ERROR: volume %s on node %s should not exist" %
887                       (volume, node))
888           bad = True
889     return bad
890
891   def _VerifyOrphanInstances(self, instancelist, node_instance, feedback_fn):
892     """Verify the list of running instances.
893
894     This checks what instances are running but unknown to the cluster.
895
896     """
897     bad = False
898     for node in node_instance:
899       for runninginstance in node_instance[node]:
900         if runninginstance not in instancelist:
901           feedback_fn("  - ERROR: instance %s on node %s should not exist" %
902                           (runninginstance, node))
903           bad = True
904     return bad
905
906   def _VerifyNPlusOneMemory(self, node_info, instance_cfg, feedback_fn):
907     """Verify N+1 Memory Resilience.
908
909     Check that if one single node dies we can still start all the instances it
910     was primary for.
911
912     """
913     bad = False
914
915     for node, nodeinfo in node_info.iteritems():
916       # This code checks that every node which is now listed as secondary has
917       # enough memory to host all instances it is supposed to should a single
918       # other node in the cluster fail.
919       # FIXME: not ready for failover to an arbitrary node
920       # FIXME: does not support file-backed instances
921       # WARNING: we currently take into account down instances as well as up
922       # ones, considering that even if they're down someone might want to start
923       # them even in the event of a node failure.
924       for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
925         needed_mem = 0
926         for instance in instances:
927           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
928           if bep[constants.BE_AUTO_BALANCE]:
929             needed_mem += bep[constants.BE_MEMORY]
930         if nodeinfo['mfree'] < needed_mem:
931           feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
932                       " failovers should node %s fail" % (node, prinode))
933           bad = True
934     return bad
935
936   def CheckPrereq(self):
937     """Check prerequisites.
938
939     Transform the list of checks we're going to skip into a set and check that
940     all its members are valid.
941
942     """
943     self.skip_set = frozenset(self.op.skip_checks)
944     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
945       raise errors.OpPrereqError("Invalid checks to be skipped specified")
946
947   def BuildHooksEnv(self):
948     """Build hooks env.
949
950     Cluster-Verify hooks just rone in the post phase and their failure makes
951     the output be logged in the verify output and the verification to fail.
952
953     """
954     all_nodes = self.cfg.GetNodeList()
955     env = {
956       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
957       }
958     for node in self.cfg.GetAllNodesInfo().values():
959       env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags())
960
961     return env, [], all_nodes
962
963   def Exec(self, feedback_fn):
964     """Verify integrity of cluster, performing various test on nodes.
965
966     """
967     bad = False
968     feedback_fn("* Verifying global settings")
969     for msg in self.cfg.VerifyConfig():
970       feedback_fn("  - ERROR: %s" % msg)
971
972     vg_name = self.cfg.GetVGName()
973     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
974     nodelist = utils.NiceSort(self.cfg.GetNodeList())
975     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
976     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
977     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
978                         for iname in instancelist)
979     i_non_redundant = [] # Non redundant instances
980     i_non_a_balanced = [] # Non auto-balanced instances
981     n_offline = [] # List of offline nodes
982     n_drained = [] # List of nodes being drained
983     node_volume = {}
984     node_instance = {}
985     node_info = {}
986     instance_cfg = {}
987
988     # FIXME: verify OS list
989     # do local checksums
990     master_files = [constants.CLUSTER_CONF_FILE]
991
992     file_names = ssconf.SimpleStore().GetFileList()
993     file_names.append(constants.SSL_CERT_FILE)
994     file_names.append(constants.RAPI_CERT_FILE)
995     file_names.extend(master_files)
996
997     local_checksums = utils.FingerprintFiles(file_names)
998
999     feedback_fn("* Gathering data (%d nodes)" % len(nodelist))
1000     node_verify_param = {
1001       constants.NV_FILELIST: file_names,
1002       constants.NV_NODELIST: [node.name for node in nodeinfo
1003                               if not node.offline],
1004       constants.NV_HYPERVISOR: hypervisors,
1005       constants.NV_NODENETTEST: [(node.name, node.primary_ip,
1006                                   node.secondary_ip) for node in nodeinfo
1007                                  if not node.offline],
1008       constants.NV_INSTANCELIST: hypervisors,
1009       constants.NV_VERSION: None,
1010       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
1011       }
1012     if vg_name is not None:
1013       node_verify_param[constants.NV_VGLIST] = None
1014       node_verify_param[constants.NV_LVLIST] = vg_name
1015       node_verify_param[constants.NV_DRBDLIST] = None
1016     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
1017                                            self.cfg.GetClusterName())
1018
1019     cluster = self.cfg.GetClusterInfo()
1020     master_node = self.cfg.GetMasterNode()
1021     all_drbd_map = self.cfg.ComputeDRBDMap()
1022
1023     for node_i in nodeinfo:
1024       node = node_i.name
1025
1026       if node_i.offline:
1027         feedback_fn("* Skipping offline node %s" % (node,))
1028         n_offline.append(node)
1029         continue
1030
1031       if node == master_node:
1032         ntype = "master"
1033       elif node_i.master_candidate:
1034         ntype = "master candidate"
1035       elif node_i.drained:
1036         ntype = "drained"
1037         n_drained.append(node)
1038       else:
1039         ntype = "regular"
1040       feedback_fn("* Verifying node %s (%s)" % (node, ntype))
1041
1042       msg = all_nvinfo[node].RemoteFailMsg()
1043       if msg:
1044         feedback_fn("  - ERROR: while contacting node %s: %s" % (node, msg))
1045         bad = True
1046         continue
1047
1048       nresult = all_nvinfo[node].payload
1049       node_drbd = {}
1050       for minor, instance in all_drbd_map[node].items():
1051         if instance not in instanceinfo:
1052           feedback_fn("  - ERROR: ghost instance '%s' in temporary DRBD map" %
1053                       instance)
1054           # ghost instance should not be running, but otherwise we
1055           # don't give double warnings (both ghost instance and
1056           # unallocated minor in use)
1057           node_drbd[minor] = (instance, False)
1058         else:
1059           instance = instanceinfo[instance]
1060           node_drbd[minor] = (instance.name, instance.admin_up)
1061       result = self._VerifyNode(node_i, file_names, local_checksums,
1062                                 nresult, feedback_fn, master_files,
1063                                 node_drbd, vg_name)
1064       bad = bad or result
1065
1066       lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
1067       if vg_name is None:
1068         node_volume[node] = {}
1069       elif isinstance(lvdata, basestring):
1070         feedback_fn("  - ERROR: LVM problem on node %s: %s" %
1071                     (node, utils.SafeEncode(lvdata)))
1072         bad = True
1073         node_volume[node] = {}
1074       elif not isinstance(lvdata, dict):
1075         feedback_fn("  - ERROR: connection to %s failed (lvlist)" % (node,))
1076         bad = True
1077         continue
1078       else:
1079         node_volume[node] = lvdata
1080
1081       # node_instance
1082       idata = nresult.get(constants.NV_INSTANCELIST, None)
1083       if not isinstance(idata, list):
1084         feedback_fn("  - ERROR: connection to %s failed (instancelist)" %
1085                     (node,))
1086         bad = True
1087         continue
1088
1089       node_instance[node] = idata
1090
1091       # node_info
1092       nodeinfo = nresult.get(constants.NV_HVINFO, None)
1093       if not isinstance(nodeinfo, dict):
1094         feedback_fn("  - ERROR: connection to %s failed (hvinfo)" % (node,))
1095         bad = True
1096         continue
1097
1098       try:
1099         node_info[node] = {
1100           "mfree": int(nodeinfo['memory_free']),
1101           "pinst": [],
1102           "sinst": [],
1103           # dictionary holding all instances this node is secondary for,
1104           # grouped by their primary node. Each key is a cluster node, and each
1105           # value is a list of instances which have the key as primary and the
1106           # current node as secondary.  this is handy to calculate N+1 memory
1107           # availability if you can only failover from a primary to its
1108           # secondary.
1109           "sinst-by-pnode": {},
1110         }
1111         # FIXME: devise a free space model for file based instances as well
1112         if vg_name is not None:
1113           if (constants.NV_VGLIST not in nresult or
1114               vg_name not in nresult[constants.NV_VGLIST]):
1115             feedback_fn("  - ERROR: node %s didn't return data for the"
1116                         " volume group '%s' - it is either missing or broken" %
1117                         (node, vg_name))
1118             bad = True
1119             continue
1120           node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
1121       except (ValueError, KeyError):
1122         feedback_fn("  - ERROR: invalid nodeinfo value returned"
1123                     " from node %s" % (node,))
1124         bad = True
1125         continue
1126
1127     node_vol_should = {}
1128
1129     for instance in instancelist:
1130       feedback_fn("* Verifying instance %s" % instance)
1131       inst_config = instanceinfo[instance]
1132       result =  self._VerifyInstance(instance, inst_config, node_volume,
1133                                      node_instance, feedback_fn, n_offline)
1134       bad = bad or result
1135       inst_nodes_offline = []
1136
1137       inst_config.MapLVsByNode(node_vol_should)
1138
1139       instance_cfg[instance] = inst_config
1140
1141       pnode = inst_config.primary_node
1142       if pnode in node_info:
1143         node_info[pnode]['pinst'].append(instance)
1144       elif pnode not in n_offline:
1145         feedback_fn("  - ERROR: instance %s, connection to primary node"
1146                     " %s failed" % (instance, pnode))
1147         bad = True
1148
1149       if pnode in n_offline:
1150         inst_nodes_offline.append(pnode)
1151
1152       # If the instance is non-redundant we cannot survive losing its primary
1153       # node, so we are not N+1 compliant. On the other hand we have no disk
1154       # templates with more than one secondary so that situation is not well
1155       # supported either.
1156       # FIXME: does not support file-backed instances
1157       if len(inst_config.secondary_nodes) == 0:
1158         i_non_redundant.append(instance)
1159       elif len(inst_config.secondary_nodes) > 1:
1160         feedback_fn("  - WARNING: multiple secondaries for instance %s"
1161                     % instance)
1162
1163       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
1164         i_non_a_balanced.append(instance)
1165
1166       for snode in inst_config.secondary_nodes:
1167         if snode in node_info:
1168           node_info[snode]['sinst'].append(instance)
1169           if pnode not in node_info[snode]['sinst-by-pnode']:
1170             node_info[snode]['sinst-by-pnode'][pnode] = []
1171           node_info[snode]['sinst-by-pnode'][pnode].append(instance)
1172         elif snode not in n_offline:
1173           feedback_fn("  - ERROR: instance %s, connection to secondary node"
1174                       " %s failed" % (instance, snode))
1175           bad = True
1176         if snode in n_offline:
1177           inst_nodes_offline.append(snode)
1178
1179       if inst_nodes_offline:
1180         # warn that the instance lives on offline nodes, and set bad=True
1181         feedback_fn("  - ERROR: instance lives on offline node(s) %s" %
1182                     ", ".join(inst_nodes_offline))
1183         bad = True
1184
1185     feedback_fn("* Verifying orphan volumes")
1186     result = self._VerifyOrphanVolumes(node_vol_should, node_volume,
1187                                        feedback_fn)
1188     bad = bad or result
1189
1190     feedback_fn("* Verifying remaining instances")
1191     result = self._VerifyOrphanInstances(instancelist, node_instance,
1192                                          feedback_fn)
1193     bad = bad or result
1194
1195     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
1196       feedback_fn("* Verifying N+1 Memory redundancy")
1197       result = self._VerifyNPlusOneMemory(node_info, instance_cfg, feedback_fn)
1198       bad = bad or result
1199
1200     feedback_fn("* Other Notes")
1201     if i_non_redundant:
1202       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
1203                   % len(i_non_redundant))
1204
1205     if i_non_a_balanced:
1206       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
1207                   % len(i_non_a_balanced))
1208
1209     if n_offline:
1210       feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
1211
1212     if n_drained:
1213       feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
1214
1215     return not bad
1216
1217   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
1218     """Analize the post-hooks' result
1219
1220     This method analyses the hook result, handles it, and sends some
1221     nicely-formatted feedback back to the user.
1222
1223     @param phase: one of L{constants.HOOKS_PHASE_POST} or
1224         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
1225     @param hooks_results: the results of the multi-node hooks rpc call
1226     @param feedback_fn: function used send feedback back to the caller
1227     @param lu_result: previous Exec result
1228     @return: the new Exec result, based on the previous result
1229         and hook results
1230
1231     """
1232     # We only really run POST phase hooks, and are only interested in
1233     # their results
1234     if phase == constants.HOOKS_PHASE_POST:
1235       # Used to change hooks' output to proper indentation
1236       indent_re = re.compile('^', re.M)
1237       feedback_fn("* Hooks Results")
1238       if not hooks_results:
1239         feedback_fn("  - ERROR: general communication failure")
1240         lu_result = 1
1241       else:
1242         for node_name in hooks_results:
1243           show_node_header = True
1244           res = hooks_results[node_name]
1245           if res.failed or res.data is False or not isinstance(res.data, list):
1246             if res.offline:
1247               # no need to warn or set fail return value
1248               continue
1249             feedback_fn("    Communication failure in hooks execution")
1250             lu_result = 1
1251             continue
1252           for script, hkr, output in res.data:
1253             if hkr == constants.HKR_FAIL:
1254               # The node header is only shown once, if there are
1255               # failing hooks on that node
1256               if show_node_header:
1257                 feedback_fn("  Node %s:" % node_name)
1258                 show_node_header = False
1259               feedback_fn("    ERROR: Script %s failed, output:" % script)
1260               output = indent_re.sub('      ', output)
1261               feedback_fn("%s" % output)
1262               lu_result = 1
1263
1264       return lu_result
1265
1266
1267 class LUVerifyDisks(NoHooksLU):
1268   """Verifies the cluster disks status.
1269
1270   """
1271   _OP_REQP = []
1272   REQ_BGL = False
1273
1274   def ExpandNames(self):
1275     self.needed_locks = {
1276       locking.LEVEL_NODE: locking.ALL_SET,
1277       locking.LEVEL_INSTANCE: locking.ALL_SET,
1278     }
1279     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
1280
1281   def CheckPrereq(self):
1282     """Check prerequisites.
1283
1284     This has no prerequisites.
1285
1286     """
1287     pass
1288
1289   def Exec(self, feedback_fn):
1290     """Verify integrity of cluster disks.
1291
1292     @rtype: tuple of three items
1293     @return: a tuple of (dict of node-to-node_error, list of instances
1294         which need activate-disks, dict of instance: (node, volume) for
1295         missing volumes
1296
1297     """
1298     result = res_nodes, res_instances, res_missing = {}, [], {}
1299
1300     vg_name = self.cfg.GetVGName()
1301     nodes = utils.NiceSort(self.cfg.GetNodeList())
1302     instances = [self.cfg.GetInstanceInfo(name)
1303                  for name in self.cfg.GetInstanceList()]
1304
1305     nv_dict = {}
1306     for inst in instances:
1307       inst_lvs = {}
1308       if (not inst.admin_up or
1309           inst.disk_template not in constants.DTS_NET_MIRROR):
1310         continue
1311       inst.MapLVsByNode(inst_lvs)
1312       # transform { iname: {node: [vol,],},} to {(node, vol): iname}
1313       for node, vol_list in inst_lvs.iteritems():
1314         for vol in vol_list:
1315           nv_dict[(node, vol)] = inst
1316
1317     if not nv_dict:
1318       return result
1319
1320     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
1321
1322     to_act = set()
1323     for node in nodes:
1324       # node_volume
1325       node_res = node_lvs[node]
1326       if node_res.offline:
1327         continue
1328       msg = node_res.RemoteFailMsg()
1329       if msg:
1330         logging.warning("Error enumerating LVs on node %s: %s", node, msg)
1331         res_nodes[node] = msg
1332         continue
1333
1334       lvs = node_res.payload
1335       for lv_name, (_, lv_inactive, lv_online) in lvs.items():
1336         inst = nv_dict.pop((node, lv_name), None)
1337         if (not lv_online and inst is not None
1338             and inst.name not in res_instances):
1339           res_instances.append(inst.name)
1340
1341     # any leftover items in nv_dict are missing LVs, let's arrange the
1342     # data better
1343     for key, inst in nv_dict.iteritems():
1344       if inst.name not in res_missing:
1345         res_missing[inst.name] = []
1346       res_missing[inst.name].append(key)
1347
1348     return result
1349
1350
1351 class LURenameCluster(LogicalUnit):
1352   """Rename the cluster.
1353
1354   """
1355   HPATH = "cluster-rename"
1356   HTYPE = constants.HTYPE_CLUSTER
1357   _OP_REQP = ["name"]
1358
1359   def BuildHooksEnv(self):
1360     """Build hooks env.
1361
1362     """
1363     env = {
1364       "OP_TARGET": self.cfg.GetClusterName(),
1365       "NEW_NAME": self.op.name,
1366       }
1367     mn = self.cfg.GetMasterNode()
1368     return env, [mn], [mn]
1369
1370   def CheckPrereq(self):
1371     """Verify that the passed name is a valid one.
1372
1373     """
1374     hostname = utils.HostInfo(self.op.name)
1375
1376     new_name = hostname.name
1377     self.ip = new_ip = hostname.ip
1378     old_name = self.cfg.GetClusterName()
1379     old_ip = self.cfg.GetMasterIP()
1380     if new_name == old_name and new_ip == old_ip:
1381       raise errors.OpPrereqError("Neither the name nor the IP address of the"
1382                                  " cluster has changed")
1383     if new_ip != old_ip:
1384       if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
1385         raise errors.OpPrereqError("The given cluster IP address (%s) is"
1386                                    " reachable on the network. Aborting." %
1387                                    new_ip)
1388
1389     self.op.name = new_name
1390
1391   def Exec(self, feedback_fn):
1392     """Rename the cluster.
1393
1394     """
1395     clustername = self.op.name
1396     ip = self.ip
1397
1398     # shutdown the master IP
1399     master = self.cfg.GetMasterNode()
1400     result = self.rpc.call_node_stop_master(master, False)
1401     if result.failed or not result.data:
1402       raise errors.OpExecError("Could not disable the master role")
1403
1404     try:
1405       cluster = self.cfg.GetClusterInfo()
1406       cluster.cluster_name = clustername
1407       cluster.master_ip = ip
1408       self.cfg.Update(cluster)
1409
1410       # update the known hosts file
1411       ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE)
1412       node_list = self.cfg.GetNodeList()
1413       try:
1414         node_list.remove(master)
1415       except ValueError:
1416         pass
1417       result = self.rpc.call_upload_file(node_list,
1418                                          constants.SSH_KNOWN_HOSTS_FILE)
1419       for to_node, to_result in result.iteritems():
1420          msg = to_result.RemoteFailMsg()
1421          if msg:
1422            msg = ("Copy of file %s to node %s failed: %s" %
1423                    (constants.SSH_KNOWN_HOSTS_FILE, to_node, msg))
1424            self.proc.LogWarning(msg)
1425
1426     finally:
1427       result = self.rpc.call_node_start_master(master, False)
1428       msg = result.RemoteFailMsg()
1429       if msg:
1430         self.LogWarning("Could not re-enable the master role on"
1431                         " the master, please restart manually: %s", msg)
1432
1433
1434 def _RecursiveCheckIfLVMBased(disk):
1435   """Check if the given disk or its children are lvm-based.
1436
1437   @type disk: L{objects.Disk}
1438   @param disk: the disk to check
1439   @rtype: booleean
1440   @return: boolean indicating whether a LD_LV dev_type was found or not
1441
1442   """
1443   if disk.children:
1444     for chdisk in disk.children:
1445       if _RecursiveCheckIfLVMBased(chdisk):
1446         return True
1447   return disk.dev_type == constants.LD_LV
1448
1449
1450 class LUSetClusterParams(LogicalUnit):
1451   """Change the parameters of the cluster.
1452
1453   """
1454   HPATH = "cluster-modify"
1455   HTYPE = constants.HTYPE_CLUSTER
1456   _OP_REQP = []
1457   REQ_BGL = False
1458
1459   def CheckArguments(self):
1460     """Check parameters
1461
1462     """
1463     if not hasattr(self.op, "candidate_pool_size"):
1464       self.op.candidate_pool_size = None
1465     if self.op.candidate_pool_size is not None:
1466       try:
1467         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
1468       except (ValueError, TypeError), err:
1469         raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" %
1470                                    str(err))
1471       if self.op.candidate_pool_size < 1:
1472         raise errors.OpPrereqError("At least one master candidate needed")
1473
1474   def ExpandNames(self):
1475     # FIXME: in the future maybe other cluster params won't require checking on
1476     # all nodes to be modified.
1477     self.needed_locks = {
1478       locking.LEVEL_NODE: locking.ALL_SET,
1479     }
1480     self.share_locks[locking.LEVEL_NODE] = 1
1481
1482   def BuildHooksEnv(self):
1483     """Build hooks env.
1484
1485     """
1486     env = {
1487       "OP_TARGET": self.cfg.GetClusterName(),
1488       "NEW_VG_NAME": self.op.vg_name,
1489       }
1490     mn = self.cfg.GetMasterNode()
1491     return env, [mn], [mn]
1492
1493   def CheckPrereq(self):
1494     """Check prerequisites.
1495
1496     This checks whether the given params don't conflict and
1497     if the given volume group is valid.
1498
1499     """
1500     if self.op.vg_name is not None and not self.op.vg_name:
1501       instances = self.cfg.GetAllInstancesInfo().values()
1502       for inst in instances:
1503         for disk in inst.disks:
1504           if _RecursiveCheckIfLVMBased(disk):
1505             raise errors.OpPrereqError("Cannot disable lvm storage while"
1506                                        " lvm-based instances exist")
1507
1508     node_list = self.acquired_locks[locking.LEVEL_NODE]
1509
1510     # if vg_name not None, checks given volume group on all nodes
1511     if self.op.vg_name:
1512       vglist = self.rpc.call_vg_list(node_list)
1513       for node in node_list:
1514         msg = vglist[node].RemoteFailMsg()
1515         if msg:
1516           # ignoring down node
1517           self.LogWarning("Error while gathering data on node %s"
1518                           " (ignoring node): %s", node, msg)
1519           continue
1520         vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
1521                                               self.op.vg_name,
1522                                               constants.MIN_VG_SIZE)
1523         if vgstatus:
1524           raise errors.OpPrereqError("Error on node '%s': %s" %
1525                                      (node, vgstatus))
1526
1527     self.cluster = cluster = self.cfg.GetClusterInfo()
1528     # validate params changes
1529     if self.op.beparams:
1530       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1531       self.new_beparams = objects.FillDict(
1532         cluster.beparams[constants.PP_DEFAULT], self.op.beparams)
1533
1534     if self.op.nicparams:
1535       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1536       self.new_nicparams = objects.FillDict(
1537         cluster.nicparams[constants.PP_DEFAULT], self.op.nicparams)
1538       objects.NIC.CheckParameterSyntax(self.new_nicparams)
1539
1540     # hypervisor list/parameters
1541     self.new_hvparams = objects.FillDict(cluster.hvparams, {})
1542     if self.op.hvparams:
1543       if not isinstance(self.op.hvparams, dict):
1544         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input")
1545       for hv_name, hv_dict in self.op.hvparams.items():
1546         if hv_name not in self.new_hvparams:
1547           self.new_hvparams[hv_name] = hv_dict
1548         else:
1549           self.new_hvparams[hv_name].update(hv_dict)
1550
1551     if self.op.enabled_hypervisors is not None:
1552       self.hv_list = self.op.enabled_hypervisors
1553     else:
1554       self.hv_list = cluster.enabled_hypervisors
1555
1556     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1557       # either the enabled list has changed, or the parameters have, validate
1558       for hv_name, hv_params in self.new_hvparams.items():
1559         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1560             (self.op.enabled_hypervisors and
1561              hv_name in self.op.enabled_hypervisors)):
1562           # either this is a new hypervisor, or its parameters have changed
1563           hv_class = hypervisor.GetHypervisor(hv_name)
1564           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1565           hv_class.CheckParameterSyntax(hv_params)
1566           _CheckHVParams(self, node_list, hv_name, hv_params)
1567
1568   def Exec(self, feedback_fn):
1569     """Change the parameters of the cluster.
1570
1571     """
1572     if self.op.vg_name is not None:
1573       new_volume = self.op.vg_name
1574       if not new_volume:
1575         new_volume = None
1576       if new_volume != self.cfg.GetVGName():
1577         self.cfg.SetVGName(new_volume)
1578       else:
1579         feedback_fn("Cluster LVM configuration already in desired"
1580                     " state, not changing")
1581     if self.op.hvparams:
1582       self.cluster.hvparams = self.new_hvparams
1583     if self.op.enabled_hypervisors is not None:
1584       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1585     if self.op.beparams:
1586       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1587     if self.op.nicparams:
1588       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1589
1590     if self.op.candidate_pool_size is not None:
1591       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1592
1593     self.cfg.Update(self.cluster)
1594
1595     # we want to update nodes after the cluster so that if any errors
1596     # happen, we have recorded and saved the cluster info
1597     if self.op.candidate_pool_size is not None:
1598       _AdjustCandidatePool(self)
1599
1600
1601 def _RedistributeAncillaryFiles(lu, additional_nodes=None):
1602   """Distribute additional files which are part of the cluster configuration.
1603
1604   ConfigWriter takes care of distributing the config and ssconf files, but
1605   there are more files which should be distributed to all nodes. This function
1606   makes sure those are copied.
1607
1608   @param lu: calling logical unit
1609   @param additional_nodes: list of nodes not in the config to distribute to
1610
1611   """
1612   # 1. Gather target nodes
1613   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
1614   dist_nodes = lu.cfg.GetNodeList()
1615   if additional_nodes is not None:
1616     dist_nodes.extend(additional_nodes)
1617   if myself.name in dist_nodes:
1618     dist_nodes.remove(myself.name)
1619   # 2. Gather files to distribute
1620   dist_files = set([constants.ETC_HOSTS,
1621                     constants.SSH_KNOWN_HOSTS_FILE,
1622                     constants.RAPI_CERT_FILE,
1623                     constants.RAPI_USERS_FILE,
1624                    ])
1625
1626   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
1627   for hv_name in enabled_hypervisors:
1628     hv_class = hypervisor.GetHypervisor(hv_name)
1629     dist_files.update(hv_class.GetAncillaryFiles())
1630
1631   # 3. Perform the files upload
1632   for fname in dist_files:
1633     if os.path.exists(fname):
1634       result = lu.rpc.call_upload_file(dist_nodes, fname)
1635       for to_node, to_result in result.items():
1636          msg = to_result.RemoteFailMsg()
1637          if msg:
1638            msg = ("Copy of file %s to node %s failed: %s" %
1639                    (fname, to_node, msg))
1640            lu.proc.LogWarning(msg)
1641
1642
1643 class LURedistributeConfig(NoHooksLU):
1644   """Force the redistribution of cluster configuration.
1645
1646   This is a very simple LU.
1647
1648   """
1649   _OP_REQP = []
1650   REQ_BGL = False
1651
1652   def ExpandNames(self):
1653     self.needed_locks = {
1654       locking.LEVEL_NODE: locking.ALL_SET,
1655     }
1656     self.share_locks[locking.LEVEL_NODE] = 1
1657
1658   def CheckPrereq(self):
1659     """Check prerequisites.
1660
1661     """
1662
1663   def Exec(self, feedback_fn):
1664     """Redistribute the configuration.
1665
1666     """
1667     self.cfg.Update(self.cfg.GetClusterInfo())
1668     _RedistributeAncillaryFiles(self)
1669
1670
1671 def _WaitForSync(lu, instance, oneshot=False, unlock=False):
1672   """Sleep and poll for an instance's disk to sync.
1673
1674   """
1675   if not instance.disks:
1676     return True
1677
1678   if not oneshot:
1679     lu.proc.LogInfo("Waiting for instance %s to sync disks." % instance.name)
1680
1681   node = instance.primary_node
1682
1683   for dev in instance.disks:
1684     lu.cfg.SetDiskID(dev, node)
1685
1686   retries = 0
1687   while True:
1688     max_time = 0
1689     done = True
1690     cumul_degraded = False
1691     rstats = lu.rpc.call_blockdev_getmirrorstatus(node, instance.disks)
1692     msg = rstats.RemoteFailMsg()
1693     if msg:
1694       lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1695       retries += 1
1696       if retries >= 10:
1697         raise errors.RemoteError("Can't contact node %s for mirror data,"
1698                                  " aborting." % node)
1699       time.sleep(6)
1700       continue
1701     rstats = rstats.payload
1702     retries = 0
1703     for i, mstat in enumerate(rstats):
1704       if mstat is None:
1705         lu.LogWarning("Can't compute data for node %s/%s",
1706                            node, instance.disks[i].iv_name)
1707         continue
1708       # we ignore the ldisk parameter
1709       perc_done, est_time, is_degraded, _ = mstat
1710       cumul_degraded = cumul_degraded or (is_degraded and perc_done is None)
1711       if perc_done is not None:
1712         done = False
1713         if est_time is not None:
1714           rem_time = "%d estimated seconds remaining" % est_time
1715           max_time = est_time
1716         else:
1717           rem_time = "no time estimate"
1718         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
1719                         (instance.disks[i].iv_name, perc_done, rem_time))
1720     if done or oneshot:
1721       break
1722
1723     time.sleep(min(60, max_time))
1724
1725   if done:
1726     lu.proc.LogInfo("Instance %s's disks are in sync." % instance.name)
1727   return not cumul_degraded
1728
1729
1730 def _CheckDiskConsistency(lu, dev, node, on_primary, ldisk=False):
1731   """Check that mirrors are not degraded.
1732
1733   The ldisk parameter, if True, will change the test from the
1734   is_degraded attribute (which represents overall non-ok status for
1735   the device(s)) to the ldisk (representing the local storage status).
1736
1737   """
1738   lu.cfg.SetDiskID(dev, node)
1739   if ldisk:
1740     idx = 6
1741   else:
1742     idx = 5
1743
1744   result = True
1745   if on_primary or dev.AssembleOnSecondary():
1746     rstats = lu.rpc.call_blockdev_find(node, dev)
1747     msg = rstats.RemoteFailMsg()
1748     if msg:
1749       lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1750       result = False
1751     elif not rstats.payload:
1752       lu.LogWarning("Can't find disk on node %s", node)
1753       result = False
1754     else:
1755       result = result and (not rstats.payload[idx])
1756   if dev.children:
1757     for child in dev.children:
1758       result = result and _CheckDiskConsistency(lu, child, node, on_primary)
1759
1760   return result
1761
1762
1763 class LUDiagnoseOS(NoHooksLU):
1764   """Logical unit for OS diagnose/query.
1765
1766   """
1767   _OP_REQP = ["output_fields", "names"]
1768   REQ_BGL = False
1769   _FIELDS_STATIC = utils.FieldSet()
1770   _FIELDS_DYNAMIC = utils.FieldSet("name", "valid", "node_status")
1771
1772   def ExpandNames(self):
1773     if self.op.names:
1774       raise errors.OpPrereqError("Selective OS query not supported")
1775
1776     _CheckOutputFields(static=self._FIELDS_STATIC,
1777                        dynamic=self._FIELDS_DYNAMIC,
1778                        selected=self.op.output_fields)
1779
1780     # Lock all nodes, in shared mode
1781     # Temporary removal of locks, should be reverted later
1782     # TODO: reintroduce locks when they are lighter-weight
1783     self.needed_locks = {}
1784     #self.share_locks[locking.LEVEL_NODE] = 1
1785     #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1786
1787   def CheckPrereq(self):
1788     """Check prerequisites.
1789
1790     """
1791
1792   @staticmethod
1793   def _DiagnoseByOS(node_list, rlist):
1794     """Remaps a per-node return list into an a per-os per-node dictionary
1795
1796     @param node_list: a list with the names of all nodes
1797     @param rlist: a map with node names as keys and OS objects as values
1798
1799     @rtype: dict
1800     @return: a dictionary with osnames as keys and as value another map, with
1801         nodes as keys and list of OS objects as values, eg::
1802
1803           {"debian-etch": {"node1": [<object>,...],
1804                            "node2": [<object>,]}
1805           }
1806
1807     """
1808     all_os = {}
1809     # we build here the list of nodes that didn't fail the RPC (at RPC
1810     # level), so that nodes with a non-responding node daemon don't
1811     # make all OSes invalid
1812     good_nodes = [node_name for node_name in rlist
1813                   if not rlist[node_name].failed]
1814     for node_name, nr in rlist.iteritems():
1815       if nr.failed or not nr.data:
1816         continue
1817       for os_obj in nr.data:
1818         if os_obj.name not in all_os:
1819           # build a list of nodes for this os containing empty lists
1820           # for each node in node_list
1821           all_os[os_obj.name] = {}
1822           for nname in good_nodes:
1823             all_os[os_obj.name][nname] = []
1824         all_os[os_obj.name][node_name].append(os_obj)
1825     return all_os
1826
1827   def Exec(self, feedback_fn):
1828     """Compute the list of OSes.
1829
1830     """
1831     valid_nodes = [node for node in self.cfg.GetOnlineNodeList()]
1832     node_data = self.rpc.call_os_diagnose(valid_nodes)
1833     if node_data == False:
1834       raise errors.OpExecError("Can't gather the list of OSes")
1835     pol = self._DiagnoseByOS(valid_nodes, node_data)
1836     output = []
1837     for os_name, os_data in pol.iteritems():
1838       row = []
1839       for field in self.op.output_fields:
1840         if field == "name":
1841           val = os_name
1842         elif field == "valid":
1843           val = utils.all([osl and osl[0] for osl in os_data.values()])
1844         elif field == "node_status":
1845           val = {}
1846           for node_name, nos_list in os_data.iteritems():
1847             val[node_name] = [(v.status, v.path) for v in nos_list]
1848         else:
1849           raise errors.ParameterError(field)
1850         row.append(val)
1851       output.append(row)
1852
1853     return output
1854
1855
1856 class LURemoveNode(LogicalUnit):
1857   """Logical unit for removing a node.
1858
1859   """
1860   HPATH = "node-remove"
1861   HTYPE = constants.HTYPE_NODE
1862   _OP_REQP = ["node_name"]
1863
1864   def BuildHooksEnv(self):
1865     """Build hooks env.
1866
1867     This doesn't run on the target node in the pre phase as a failed
1868     node would then be impossible to remove.
1869
1870     """
1871     env = {
1872       "OP_TARGET": self.op.node_name,
1873       "NODE_NAME": self.op.node_name,
1874       }
1875     all_nodes = self.cfg.GetNodeList()
1876     all_nodes.remove(self.op.node_name)
1877     return env, all_nodes, all_nodes
1878
1879   def CheckPrereq(self):
1880     """Check prerequisites.
1881
1882     This checks:
1883      - the node exists in the configuration
1884      - it does not have primary or secondary instances
1885      - it's not the master
1886
1887     Any errors are signalled by raising errors.OpPrereqError.
1888
1889     """
1890     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
1891     if node is None:
1892       raise errors.OpPrereqError, ("Node '%s' is unknown." % self.op.node_name)
1893
1894     instance_list = self.cfg.GetInstanceList()
1895
1896     masternode = self.cfg.GetMasterNode()
1897     if node.name == masternode:
1898       raise errors.OpPrereqError("Node is the master node,"
1899                                  " you need to failover first.")
1900
1901     for instance_name in instance_list:
1902       instance = self.cfg.GetInstanceInfo(instance_name)
1903       if node.name in instance.all_nodes:
1904         raise errors.OpPrereqError("Instance %s is still running on the node,"
1905                                    " please remove first." % instance_name)
1906     self.op.node_name = node.name
1907     self.node = node
1908
1909   def Exec(self, feedback_fn):
1910     """Removes the node from the cluster.
1911
1912     """
1913     node = self.node
1914     logging.info("Stopping the node daemon and removing configs from node %s",
1915                  node.name)
1916
1917     self.context.RemoveNode(node.name)
1918
1919     self.rpc.call_node_leave_cluster(node.name)
1920
1921     # Promote nodes to master candidate as needed
1922     _AdjustCandidatePool(self)
1923
1924
1925 class LUQueryNodes(NoHooksLU):
1926   """Logical unit for querying nodes.
1927
1928   """
1929   _OP_REQP = ["output_fields", "names", "use_locking"]
1930   REQ_BGL = False
1931   _FIELDS_DYNAMIC = utils.FieldSet(
1932     "dtotal", "dfree",
1933     "mtotal", "mnode", "mfree",
1934     "bootid",
1935     "ctotal", "cnodes", "csockets",
1936     )
1937
1938   _FIELDS_STATIC = utils.FieldSet(
1939     "name", "pinst_cnt", "sinst_cnt",
1940     "pinst_list", "sinst_list",
1941     "pip", "sip", "tags",
1942     "serial_no",
1943     "master_candidate",
1944     "master",
1945     "offline",
1946     "drained",
1947     )
1948
1949   def ExpandNames(self):
1950     _CheckOutputFields(static=self._FIELDS_STATIC,
1951                        dynamic=self._FIELDS_DYNAMIC,
1952                        selected=self.op.output_fields)
1953
1954     self.needed_locks = {}
1955     self.share_locks[locking.LEVEL_NODE] = 1
1956
1957     if self.op.names:
1958       self.wanted = _GetWantedNodes(self, self.op.names)
1959     else:
1960       self.wanted = locking.ALL_SET
1961
1962     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
1963     self.do_locking = self.do_node_query and self.op.use_locking
1964     if self.do_locking:
1965       # if we don't request only static fields, we need to lock the nodes
1966       self.needed_locks[locking.LEVEL_NODE] = self.wanted
1967
1968
1969   def CheckPrereq(self):
1970     """Check prerequisites.
1971
1972     """
1973     # The validation of the node list is done in the _GetWantedNodes,
1974     # if non empty, and if empty, there's no validation to do
1975     pass
1976
1977   def Exec(self, feedback_fn):
1978     """Computes the list of nodes and their attributes.
1979
1980     """
1981     all_info = self.cfg.GetAllNodesInfo()
1982     if self.do_locking:
1983       nodenames = self.acquired_locks[locking.LEVEL_NODE]
1984     elif self.wanted != locking.ALL_SET:
1985       nodenames = self.wanted
1986       missing = set(nodenames).difference(all_info.keys())
1987       if missing:
1988         raise errors.OpExecError(
1989           "Some nodes were removed before retrieving their data: %s" % missing)
1990     else:
1991       nodenames = all_info.keys()
1992
1993     nodenames = utils.NiceSort(nodenames)
1994     nodelist = [all_info[name] for name in nodenames]
1995
1996     # begin data gathering
1997
1998     if self.do_node_query:
1999       live_data = {}
2000       node_data = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
2001                                           self.cfg.GetHypervisorType())
2002       for name in nodenames:
2003         nodeinfo = node_data[name]
2004         if not nodeinfo.RemoteFailMsg() and nodeinfo.payload:
2005           nodeinfo = nodeinfo.payload
2006           fn = utils.TryConvert
2007           live_data[name] = {
2008             "mtotal": fn(int, nodeinfo.get('memory_total', None)),
2009             "mnode": fn(int, nodeinfo.get('memory_dom0', None)),
2010             "mfree": fn(int, nodeinfo.get('memory_free', None)),
2011             "dtotal": fn(int, nodeinfo.get('vg_size', None)),
2012             "dfree": fn(int, nodeinfo.get('vg_free', None)),
2013             "ctotal": fn(int, nodeinfo.get('cpu_total', None)),
2014             "bootid": nodeinfo.get('bootid', None),
2015             "cnodes": fn(int, nodeinfo.get('cpu_nodes', None)),
2016             "csockets": fn(int, nodeinfo.get('cpu_sockets', None)),
2017             }
2018         else:
2019           live_data[name] = {}
2020     else:
2021       live_data = dict.fromkeys(nodenames, {})
2022
2023     node_to_primary = dict([(name, set()) for name in nodenames])
2024     node_to_secondary = dict([(name, set()) for name in nodenames])
2025
2026     inst_fields = frozenset(("pinst_cnt", "pinst_list",
2027                              "sinst_cnt", "sinst_list"))
2028     if inst_fields & frozenset(self.op.output_fields):
2029       instancelist = self.cfg.GetInstanceList()
2030
2031       for instance_name in instancelist:
2032         inst = self.cfg.GetInstanceInfo(instance_name)
2033         if inst.primary_node in node_to_primary:
2034           node_to_primary[inst.primary_node].add(inst.name)
2035         for secnode in inst.secondary_nodes:
2036           if secnode in node_to_secondary:
2037             node_to_secondary[secnode].add(inst.name)
2038
2039     master_node = self.cfg.GetMasterNode()
2040
2041     # end data gathering
2042
2043     output = []
2044     for node in nodelist:
2045       node_output = []
2046       for field in self.op.output_fields:
2047         if field == "name":
2048           val = node.name
2049         elif field == "pinst_list":
2050           val = list(node_to_primary[node.name])
2051         elif field == "sinst_list":
2052           val = list(node_to_secondary[node.name])
2053         elif field == "pinst_cnt":
2054           val = len(node_to_primary[node.name])
2055         elif field == "sinst_cnt":
2056           val = len(node_to_secondary[node.name])
2057         elif field == "pip":
2058           val = node.primary_ip
2059         elif field == "sip":
2060           val = node.secondary_ip
2061         elif field == "tags":
2062           val = list(node.GetTags())
2063         elif field == "serial_no":
2064           val = node.serial_no
2065         elif field == "master_candidate":
2066           val = node.master_candidate
2067         elif field == "master":
2068           val = node.name == master_node
2069         elif field == "offline":
2070           val = node.offline
2071         elif field == "drained":
2072           val = node.drained
2073         elif self._FIELDS_DYNAMIC.Matches(field):
2074           val = live_data[node.name].get(field, None)
2075         else:
2076           raise errors.ParameterError(field)
2077         node_output.append(val)
2078       output.append(node_output)
2079
2080     return output
2081
2082
2083 class LUQueryNodeVolumes(NoHooksLU):
2084   """Logical unit for getting volumes on node(s).
2085
2086   """
2087   _OP_REQP = ["nodes", "output_fields"]
2088   REQ_BGL = False
2089   _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
2090   _FIELDS_STATIC = utils.FieldSet("node")
2091
2092   def ExpandNames(self):
2093     _CheckOutputFields(static=self._FIELDS_STATIC,
2094                        dynamic=self._FIELDS_DYNAMIC,
2095                        selected=self.op.output_fields)
2096
2097     self.needed_locks = {}
2098     self.share_locks[locking.LEVEL_NODE] = 1
2099     if not self.op.nodes:
2100       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2101     else:
2102       self.needed_locks[locking.LEVEL_NODE] = \
2103         _GetWantedNodes(self, self.op.nodes)
2104
2105   def CheckPrereq(self):
2106     """Check prerequisites.
2107
2108     This checks that the fields required are valid output fields.
2109
2110     """
2111     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
2112
2113   def Exec(self, feedback_fn):
2114     """Computes the list of nodes and their attributes.
2115
2116     """
2117     nodenames = self.nodes
2118     volumes = self.rpc.call_node_volumes(nodenames)
2119
2120     ilist = [self.cfg.GetInstanceInfo(iname) for iname
2121              in self.cfg.GetInstanceList()]
2122
2123     lv_by_node = dict([(inst, inst.MapLVsByNode()) for inst in ilist])
2124
2125     output = []
2126     for node in nodenames:
2127       if node not in volumes or volumes[node].failed or not volumes[node].data:
2128         continue
2129
2130       node_vols = volumes[node].data[:]
2131       node_vols.sort(key=lambda vol: vol['dev'])
2132
2133       for vol in node_vols:
2134         node_output = []
2135         for field in self.op.output_fields:
2136           if field == "node":
2137             val = node
2138           elif field == "phys":
2139             val = vol['dev']
2140           elif field == "vg":
2141             val = vol['vg']
2142           elif field == "name":
2143             val = vol['name']
2144           elif field == "size":
2145             val = int(float(vol['size']))
2146           elif field == "instance":
2147             for inst in ilist:
2148               if node not in lv_by_node[inst]:
2149                 continue
2150               if vol['name'] in lv_by_node[inst][node]:
2151                 val = inst.name
2152                 break
2153             else:
2154               val = '-'
2155           else:
2156             raise errors.ParameterError(field)
2157           node_output.append(str(val))
2158
2159         output.append(node_output)
2160
2161     return output
2162
2163
2164 class LUAddNode(LogicalUnit):
2165   """Logical unit for adding node to the cluster.
2166
2167   """
2168   HPATH = "node-add"
2169   HTYPE = constants.HTYPE_NODE
2170   _OP_REQP = ["node_name"]
2171
2172   def BuildHooksEnv(self):
2173     """Build hooks env.
2174
2175     This will run on all nodes before, and on all nodes + the new node after.
2176
2177     """
2178     env = {
2179       "OP_TARGET": self.op.node_name,
2180       "NODE_NAME": self.op.node_name,
2181       "NODE_PIP": self.op.primary_ip,
2182       "NODE_SIP": self.op.secondary_ip,
2183       }
2184     nodes_0 = self.cfg.GetNodeList()
2185     nodes_1 = nodes_0 + [self.op.node_name, ]
2186     return env, nodes_0, nodes_1
2187
2188   def CheckPrereq(self):
2189     """Check prerequisites.
2190
2191     This checks:
2192      - the new node is not already in the config
2193      - it is resolvable
2194      - its parameters (single/dual homed) matches the cluster
2195
2196     Any errors are signalled by raising errors.OpPrereqError.
2197
2198     """
2199     node_name = self.op.node_name
2200     cfg = self.cfg
2201
2202     dns_data = utils.HostInfo(node_name)
2203
2204     node = dns_data.name
2205     primary_ip = self.op.primary_ip = dns_data.ip
2206     secondary_ip = getattr(self.op, "secondary_ip", None)
2207     if secondary_ip is None:
2208       secondary_ip = primary_ip
2209     if not utils.IsValidIP(secondary_ip):
2210       raise errors.OpPrereqError("Invalid secondary IP given")
2211     self.op.secondary_ip = secondary_ip
2212
2213     node_list = cfg.GetNodeList()
2214     if not self.op.readd and node in node_list:
2215       raise errors.OpPrereqError("Node %s is already in the configuration" %
2216                                  node)
2217     elif self.op.readd and node not in node_list:
2218       raise errors.OpPrereqError("Node %s is not in the configuration" % node)
2219
2220     for existing_node_name in node_list:
2221       existing_node = cfg.GetNodeInfo(existing_node_name)
2222
2223       if self.op.readd and node == existing_node_name:
2224         if (existing_node.primary_ip != primary_ip or
2225             existing_node.secondary_ip != secondary_ip):
2226           raise errors.OpPrereqError("Readded node doesn't have the same IP"
2227                                      " address configuration as before")
2228         continue
2229
2230       if (existing_node.primary_ip == primary_ip or
2231           existing_node.secondary_ip == primary_ip or
2232           existing_node.primary_ip == secondary_ip or
2233           existing_node.secondary_ip == secondary_ip):
2234         raise errors.OpPrereqError("New node ip address(es) conflict with"
2235                                    " existing node %s" % existing_node.name)
2236
2237     # check that the type of the node (single versus dual homed) is the
2238     # same as for the master
2239     myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
2240     master_singlehomed = myself.secondary_ip == myself.primary_ip
2241     newbie_singlehomed = secondary_ip == primary_ip
2242     if master_singlehomed != newbie_singlehomed:
2243       if master_singlehomed:
2244         raise errors.OpPrereqError("The master has no private ip but the"
2245                                    " new node has one")
2246       else:
2247         raise errors.OpPrereqError("The master has a private ip but the"
2248                                    " new node doesn't have one")
2249
2250     # checks reachablity
2251     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
2252       raise errors.OpPrereqError("Node not reachable by ping")
2253
2254     if not newbie_singlehomed:
2255       # check reachability from my secondary ip to newbie's secondary ip
2256       if not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
2257                            source=myself.secondary_ip):
2258         raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
2259                                    " based ping to noded port")
2260
2261     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2262     mc_now, _ = self.cfg.GetMasterCandidateStats()
2263     master_candidate = mc_now < cp_size
2264
2265     self.new_node = objects.Node(name=node,
2266                                  primary_ip=primary_ip,
2267                                  secondary_ip=secondary_ip,
2268                                  master_candidate=master_candidate,
2269                                  offline=False, drained=False)
2270
2271   def Exec(self, feedback_fn):
2272     """Adds the new node to the cluster.
2273
2274     """
2275     new_node = self.new_node
2276     node = new_node.name
2277
2278     # check connectivity
2279     result = self.rpc.call_version([node])[node]
2280     result.Raise()
2281     if result.data:
2282       if constants.PROTOCOL_VERSION == result.data:
2283         logging.info("Communication to node %s fine, sw version %s match",
2284                      node, result.data)
2285       else:
2286         raise errors.OpExecError("Version mismatch master version %s,"
2287                                  " node version %s" %
2288                                  (constants.PROTOCOL_VERSION, result.data))
2289     else:
2290       raise errors.OpExecError("Cannot get version from the new node")
2291
2292     # setup ssh on node
2293     logging.info("Copy ssh key to node %s", node)
2294     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
2295     keyarray = []
2296     keyfiles = [constants.SSH_HOST_DSA_PRIV, constants.SSH_HOST_DSA_PUB,
2297                 constants.SSH_HOST_RSA_PRIV, constants.SSH_HOST_RSA_PUB,
2298                 priv_key, pub_key]
2299
2300     for i in keyfiles:
2301       f = open(i, 'r')
2302       try:
2303         keyarray.append(f.read())
2304       finally:
2305         f.close()
2306
2307     result = self.rpc.call_node_add(node, keyarray[0], keyarray[1],
2308                                     keyarray[2],
2309                                     keyarray[3], keyarray[4], keyarray[5])
2310
2311     msg = result.RemoteFailMsg()
2312     if msg:
2313       raise errors.OpExecError("Cannot transfer ssh keys to the"
2314                                " new node: %s" % msg)
2315
2316     # Add node to our /etc/hosts, and add key to known_hosts
2317     if self.cfg.GetClusterInfo().modify_etc_hosts:
2318       utils.AddHostToEtcHosts(new_node.name)
2319
2320     if new_node.secondary_ip != new_node.primary_ip:
2321       result = self.rpc.call_node_has_ip_address(new_node.name,
2322                                                  new_node.secondary_ip)
2323       msg = result.RemoteFailMsg()
2324       if msg:
2325         raise errors.OpPrereqError("Failure checking secondary ip"
2326                                    " on node %s: %s" % (new_node.name, msg))
2327       if not result.payload:
2328         raise errors.OpExecError("Node claims it doesn't have the secondary ip"
2329                                  " you gave (%s). Please fix and re-run this"
2330                                  " command." % new_node.secondary_ip)
2331
2332     node_verify_list = [self.cfg.GetMasterNode()]
2333     node_verify_param = {
2334       'nodelist': [node],
2335       # TODO: do a node-net-test as well?
2336     }
2337
2338     result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
2339                                        self.cfg.GetClusterName())
2340     for verifier in node_verify_list:
2341       msg = result[verifier].RemoteFailMsg()
2342       if msg:
2343         raise errors.OpExecError("Cannot communicate with node %s: %s" %
2344                                  (verifier, msg))
2345       nl_payload = result[verifier].payload['nodelist']
2346       if nl_payload:
2347         for failed in nl_payload:
2348           feedback_fn("ssh/hostname verification failed %s -> %s" %
2349                       (verifier, nl_payload[failed]))
2350         raise errors.OpExecError("ssh/hostname verification failed.")
2351
2352     if self.op.readd:
2353       _RedistributeAncillaryFiles(self)
2354       self.context.ReaddNode(new_node)
2355     else:
2356       _RedistributeAncillaryFiles(self, additional_nodes=[node])
2357       self.context.AddNode(new_node)
2358
2359
2360 class LUSetNodeParams(LogicalUnit):
2361   """Modifies the parameters of a node.
2362
2363   """
2364   HPATH = "node-modify"
2365   HTYPE = constants.HTYPE_NODE
2366   _OP_REQP = ["node_name"]
2367   REQ_BGL = False
2368
2369   def CheckArguments(self):
2370     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2371     if node_name is None:
2372       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2373     self.op.node_name = node_name
2374     _CheckBooleanOpField(self.op, 'master_candidate')
2375     _CheckBooleanOpField(self.op, 'offline')
2376     _CheckBooleanOpField(self.op, 'drained')
2377     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
2378     if all_mods.count(None) == 3:
2379       raise errors.OpPrereqError("Please pass at least one modification")
2380     if all_mods.count(True) > 1:
2381       raise errors.OpPrereqError("Can't set the node into more than one"
2382                                  " state at the same time")
2383
2384   def ExpandNames(self):
2385     self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
2386
2387   def BuildHooksEnv(self):
2388     """Build hooks env.
2389
2390     This runs on the master node.
2391
2392     """
2393     env = {
2394       "OP_TARGET": self.op.node_name,
2395       "MASTER_CANDIDATE": str(self.op.master_candidate),
2396       "OFFLINE": str(self.op.offline),
2397       "DRAINED": str(self.op.drained),
2398       }
2399     nl = [self.cfg.GetMasterNode(),
2400           self.op.node_name]
2401     return env, nl, nl
2402
2403   def CheckPrereq(self):
2404     """Check prerequisites.
2405
2406     This only checks the instance list against the existing names.
2407
2408     """
2409     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
2410
2411     if ((self.op.master_candidate == False or self.op.offline == True or
2412          self.op.drained == True) and node.master_candidate):
2413       # we will demote the node from master_candidate
2414       if self.op.node_name == self.cfg.GetMasterNode():
2415         raise errors.OpPrereqError("The master node has to be a"
2416                                    " master candidate, online and not drained")
2417       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
2418       num_candidates, _ = self.cfg.GetMasterCandidateStats()
2419       if num_candidates <= cp_size:
2420         msg = ("Not enough master candidates (desired"
2421                " %d, new value will be %d)" % (cp_size, num_candidates-1))
2422         if self.op.force:
2423           self.LogWarning(msg)
2424         else:
2425           raise errors.OpPrereqError(msg)
2426
2427     if (self.op.master_candidate == True and
2428         ((node.offline and not self.op.offline == False) or
2429          (node.drained and not self.op.drained == False))):
2430       raise errors.OpPrereqError("Node '%s' is offline or drained, can't set"
2431                                  " to master_candidate" % node.name)
2432
2433     return
2434
2435   def Exec(self, feedback_fn):
2436     """Modifies a node.
2437
2438     """
2439     node = self.node
2440
2441     result = []
2442     changed_mc = False
2443
2444     if self.op.offline is not None:
2445       node.offline = self.op.offline
2446       result.append(("offline", str(self.op.offline)))
2447       if self.op.offline == True:
2448         if node.master_candidate:
2449           node.master_candidate = False
2450           changed_mc = True
2451           result.append(("master_candidate", "auto-demotion due to offline"))
2452         if node.drained:
2453           node.drained = False
2454           result.append(("drained", "clear drained status due to offline"))
2455
2456     if self.op.master_candidate is not None:
2457       node.master_candidate = self.op.master_candidate
2458       changed_mc = True
2459       result.append(("master_candidate", str(self.op.master_candidate)))
2460       if self.op.master_candidate == False:
2461         rrc = self.rpc.call_node_demote_from_mc(node.name)
2462         msg = rrc.RemoteFailMsg()
2463         if msg:
2464           self.LogWarning("Node failed to demote itself: %s" % msg)
2465
2466     if self.op.drained is not None:
2467       node.drained = self.op.drained
2468       result.append(("drained", str(self.op.drained)))
2469       if self.op.drained == True:
2470         if node.master_candidate:
2471           node.master_candidate = False
2472           changed_mc = True
2473           result.append(("master_candidate", "auto-demotion due to drain"))
2474         if node.offline:
2475           node.offline = False
2476           result.append(("offline", "clear offline status due to drain"))
2477
2478     # this will trigger configuration file update, if needed
2479     self.cfg.Update(node)
2480     # this will trigger job queue propagation or cleanup
2481     if changed_mc:
2482       self.context.ReaddNode(node)
2483
2484     return result
2485
2486
2487 class LUPowercycleNode(NoHooksLU):
2488   """Powercycles a node.
2489
2490   """
2491   _OP_REQP = ["node_name", "force"]
2492   REQ_BGL = False
2493
2494   def CheckArguments(self):
2495     node_name = self.cfg.ExpandNodeName(self.op.node_name)
2496     if node_name is None:
2497       raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name)
2498     self.op.node_name = node_name
2499     if node_name == self.cfg.GetMasterNode() and not self.op.force:
2500       raise errors.OpPrereqError("The node is the master and the force"
2501                                  " parameter was not set")
2502
2503   def ExpandNames(self):
2504     """Locking for PowercycleNode.
2505
2506     This is a last-resource option and shouldn't block on other
2507     jobs. Therefore, we grab no locks.
2508
2509     """
2510     self.needed_locks = {}
2511
2512   def CheckPrereq(self):
2513     """Check prerequisites.
2514
2515     This LU has no prereqs.
2516
2517     """
2518     pass
2519
2520   def Exec(self, feedback_fn):
2521     """Reboots a node.
2522
2523     """
2524     result = self.rpc.call_node_powercycle(self.op.node_name,
2525                                            self.cfg.GetHypervisorType())
2526     msg = result.RemoteFailMsg()
2527     if msg:
2528       raise errors.OpExecError("Failed to schedule the reboot: %s" % msg)
2529     return result.payload
2530
2531
2532 class LUQueryClusterInfo(NoHooksLU):
2533   """Query cluster configuration.
2534
2535   """
2536   _OP_REQP = []
2537   REQ_BGL = False
2538
2539   def ExpandNames(self):
2540     self.needed_locks = {}
2541
2542   def CheckPrereq(self):
2543     """No prerequsites needed for this LU.
2544
2545     """
2546     pass
2547
2548   def Exec(self, feedback_fn):
2549     """Return cluster config.
2550
2551     """
2552     cluster = self.cfg.GetClusterInfo()
2553     result = {
2554       "software_version": constants.RELEASE_VERSION,
2555       "protocol_version": constants.PROTOCOL_VERSION,
2556       "config_version": constants.CONFIG_VERSION,
2557       "os_api_version": constants.OS_API_VERSION,
2558       "export_version": constants.EXPORT_VERSION,
2559       "architecture": (platform.architecture()[0], platform.machine()),
2560       "name": cluster.cluster_name,
2561       "master": cluster.master_node,
2562       "default_hypervisor": cluster.default_hypervisor,
2563       "enabled_hypervisors": cluster.enabled_hypervisors,
2564       "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
2565                         for hypervisor in cluster.enabled_hypervisors]),
2566       "beparams": cluster.beparams,
2567       "nicparams": cluster.nicparams,
2568       "candidate_pool_size": cluster.candidate_pool_size,
2569       "master_netdev": cluster.master_netdev,
2570       "volume_group_name": cluster.volume_group_name,
2571       "file_storage_dir": cluster.file_storage_dir,
2572       }
2573
2574     return result
2575
2576
2577 class LUQueryConfigValues(NoHooksLU):
2578   """Return configuration values.
2579
2580   """
2581   _OP_REQP = []
2582   REQ_BGL = False
2583   _FIELDS_DYNAMIC = utils.FieldSet()
2584   _FIELDS_STATIC = utils.FieldSet("cluster_name", "master_node", "drain_flag")
2585
2586   def ExpandNames(self):
2587     self.needed_locks = {}
2588
2589     _CheckOutputFields(static=self._FIELDS_STATIC,
2590                        dynamic=self._FIELDS_DYNAMIC,
2591                        selected=self.op.output_fields)
2592
2593   def CheckPrereq(self):
2594     """No prerequisites.
2595
2596     """
2597     pass
2598
2599   def Exec(self, feedback_fn):
2600     """Dump a representation of the cluster config to the standard output.
2601
2602     """
2603     values = []
2604     for field in self.op.output_fields:
2605       if field == "cluster_name":
2606         entry = self.cfg.GetClusterName()
2607       elif field == "master_node":
2608         entry = self.cfg.GetMasterNode()
2609       elif field == "drain_flag":
2610         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
2611       else:
2612         raise errors.ParameterError(field)
2613       values.append(entry)
2614     return values
2615
2616
2617 class LUActivateInstanceDisks(NoHooksLU):
2618   """Bring up an instance's disks.
2619
2620   """
2621   _OP_REQP = ["instance_name"]
2622   REQ_BGL = False
2623
2624   def ExpandNames(self):
2625     self._ExpandAndLockInstance()
2626     self.needed_locks[locking.LEVEL_NODE] = []
2627     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2628
2629   def DeclareLocks(self, level):
2630     if level == locking.LEVEL_NODE:
2631       self._LockInstancesNodes()
2632
2633   def CheckPrereq(self):
2634     """Check prerequisites.
2635
2636     This checks that the instance is in the cluster.
2637
2638     """
2639     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2640     assert self.instance is not None, \
2641       "Cannot retrieve locked instance %s" % self.op.instance_name
2642     _CheckNodeOnline(self, self.instance.primary_node)
2643
2644   def Exec(self, feedback_fn):
2645     """Activate the disks.
2646
2647     """
2648     disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
2649     if not disks_ok:
2650       raise errors.OpExecError("Cannot activate block devices")
2651
2652     return disks_info
2653
2654
2655 def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
2656   """Prepare the block devices for an instance.
2657
2658   This sets up the block devices on all nodes.
2659
2660   @type lu: L{LogicalUnit}
2661   @param lu: the logical unit on whose behalf we execute
2662   @type instance: L{objects.Instance}
2663   @param instance: the instance for whose disks we assemble
2664   @type ignore_secondaries: boolean
2665   @param ignore_secondaries: if true, errors on secondary nodes
2666       won't result in an error return from the function
2667   @return: False if the operation failed, otherwise a list of
2668       (host, instance_visible_name, node_visible_name)
2669       with the mapping from node devices to instance devices
2670
2671   """
2672   device_info = []
2673   disks_ok = True
2674   iname = instance.name
2675   # With the two passes mechanism we try to reduce the window of
2676   # opportunity for the race condition of switching DRBD to primary
2677   # before handshaking occured, but we do not eliminate it
2678
2679   # The proper fix would be to wait (with some limits) until the
2680   # connection has been made and drbd transitions from WFConnection
2681   # into any other network-connected state (Connected, SyncTarget,
2682   # SyncSource, etc.)
2683
2684   # 1st pass, assemble on all nodes in secondary mode
2685   for inst_disk in instance.disks:
2686     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2687       lu.cfg.SetDiskID(node_disk, node)
2688       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
2689       msg = result.RemoteFailMsg()
2690       if msg:
2691         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2692                            " (is_primary=False, pass=1): %s",
2693                            inst_disk.iv_name, node, msg)
2694         if not ignore_secondaries:
2695           disks_ok = False
2696
2697   # FIXME: race condition on drbd migration to primary
2698
2699   # 2nd pass, do only the primary node
2700   for inst_disk in instance.disks:
2701     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
2702       if node != instance.primary_node:
2703         continue
2704       lu.cfg.SetDiskID(node_disk, node)
2705       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
2706       msg = result.RemoteFailMsg()
2707       if msg:
2708         lu.proc.LogWarning("Could not prepare block device %s on node %s"
2709                            " (is_primary=True, pass=2): %s",
2710                            inst_disk.iv_name, node, msg)
2711         disks_ok = False
2712     device_info.append((instance.primary_node, inst_disk.iv_name,
2713                         result.payload))
2714
2715   # leave the disks configured for the primary node
2716   # this is a workaround that would be fixed better by
2717   # improving the logical/physical id handling
2718   for disk in instance.disks:
2719     lu.cfg.SetDiskID(disk, instance.primary_node)
2720
2721   return disks_ok, device_info
2722
2723
2724 def _StartInstanceDisks(lu, instance, force):
2725   """Start the disks of an instance.
2726
2727   """
2728   disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
2729                                            ignore_secondaries=force)
2730   if not disks_ok:
2731     _ShutdownInstanceDisks(lu, instance)
2732     if force is not None and not force:
2733       lu.proc.LogWarning("", hint="If the message above refers to a"
2734                          " secondary node,"
2735                          " you can retry the operation using '--force'.")
2736     raise errors.OpExecError("Disk consistency error")
2737
2738
2739 class LUDeactivateInstanceDisks(NoHooksLU):
2740   """Shutdown an instance's disks.
2741
2742   """
2743   _OP_REQP = ["instance_name"]
2744   REQ_BGL = False
2745
2746   def ExpandNames(self):
2747     self._ExpandAndLockInstance()
2748     self.needed_locks[locking.LEVEL_NODE] = []
2749     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2750
2751   def DeclareLocks(self, level):
2752     if level == locking.LEVEL_NODE:
2753       self._LockInstancesNodes()
2754
2755   def CheckPrereq(self):
2756     """Check prerequisites.
2757
2758     This checks that the instance is in the cluster.
2759
2760     """
2761     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2762     assert self.instance is not None, \
2763       "Cannot retrieve locked instance %s" % self.op.instance_name
2764
2765   def Exec(self, feedback_fn):
2766     """Deactivate the disks
2767
2768     """
2769     instance = self.instance
2770     _SafeShutdownInstanceDisks(self, instance)
2771
2772
2773 def _SafeShutdownInstanceDisks(lu, instance):
2774   """Shutdown block devices of an instance.
2775
2776   This function checks if an instance is running, before calling
2777   _ShutdownInstanceDisks.
2778
2779   """
2780   pnode = instance.primary_node
2781   ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])
2782   ins_l = ins_l[pnode]
2783   msg = ins_l.RemoteFailMsg()
2784   if msg:
2785     raise errors.OpExecError("Can't contact node %s: %s" % (pnode, msg))
2786
2787   if instance.name in ins_l.payload:
2788     raise errors.OpExecError("Instance is running, can't shutdown"
2789                              " block devices.")
2790
2791   _ShutdownInstanceDisks(lu, instance)
2792
2793
2794 def _ShutdownInstanceDisks(lu, instance, ignore_primary=False):
2795   """Shutdown block devices of an instance.
2796
2797   This does the shutdown on all nodes of the instance.
2798
2799   If the ignore_primary is false, errors on the primary node are
2800   ignored.
2801
2802   """
2803   all_result = True
2804   for disk in instance.disks:
2805     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
2806       lu.cfg.SetDiskID(top_disk, node)
2807       result = lu.rpc.call_blockdev_shutdown(node, top_disk)
2808       msg = result.RemoteFailMsg()
2809       if msg:
2810         lu.LogWarning("Could not shutdown block device %s on node %s: %s",
2811                       disk.iv_name, node, msg)
2812         if not ignore_primary or node != instance.primary_node:
2813           all_result = False
2814   return all_result
2815
2816
2817 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
2818   """Checks if a node has enough free memory.
2819
2820   This function check if a given node has the needed amount of free
2821   memory. In case the node has less memory or we cannot get the
2822   information from the node, this function raise an OpPrereqError
2823   exception.
2824
2825   @type lu: C{LogicalUnit}
2826   @param lu: a logical unit from which we get configuration data
2827   @type node: C{str}
2828   @param node: the node to check
2829   @type reason: C{str}
2830   @param reason: string to use in the error message
2831   @type requested: C{int}
2832   @param requested: the amount of memory in MiB to check for
2833   @type hypervisor_name: C{str}
2834   @param hypervisor_name: the hypervisor to ask for memory stats
2835   @raise errors.OpPrereqError: if the node doesn't have enough memory, or
2836       we cannot check the node
2837
2838   """
2839   nodeinfo = lu.rpc.call_node_info([node], lu.cfg.GetVGName(), hypervisor_name)
2840   msg = nodeinfo[node].RemoteFailMsg()
2841   if msg:
2842     raise errors.OpPrereqError("Can't get data from node %s: %s" % (node, msg))
2843   free_mem = nodeinfo[node].payload.get('memory_free', None)
2844   if not isinstance(free_mem, int):
2845     raise errors.OpPrereqError("Can't compute free memory on node %s, result"
2846                                " was '%s'" % (node, free_mem))
2847   if requested > free_mem:
2848     raise errors.OpPrereqError("Not enough memory on node %s for %s:"
2849                                " needed %s MiB, available %s MiB" %
2850                                (node, reason, requested, free_mem))
2851
2852
2853 class LUStartupInstance(LogicalUnit):
2854   """Starts an instance.
2855
2856   """
2857   HPATH = "instance-start"
2858   HTYPE = constants.HTYPE_INSTANCE
2859   _OP_REQP = ["instance_name", "force"]
2860   REQ_BGL = False
2861
2862   def ExpandNames(self):
2863     self._ExpandAndLockInstance()
2864
2865   def BuildHooksEnv(self):
2866     """Build hooks env.
2867
2868     This runs on master, primary and secondary nodes of the instance.
2869
2870     """
2871     env = {
2872       "FORCE": self.op.force,
2873       }
2874     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2875     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2876     return env, nl, nl
2877
2878   def CheckPrereq(self):
2879     """Check prerequisites.
2880
2881     This checks that the instance is in the cluster.
2882
2883     """
2884     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2885     assert self.instance is not None, \
2886       "Cannot retrieve locked instance %s" % self.op.instance_name
2887
2888     # extra beparams
2889     self.beparams = getattr(self.op, "beparams", {})
2890     if self.beparams:
2891       if not isinstance(self.beparams, dict):
2892         raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
2893                                    " dict" % (type(self.beparams), ))
2894       # fill the beparams dict
2895       utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
2896       self.op.beparams = self.beparams
2897
2898     # extra hvparams
2899     self.hvparams = getattr(self.op, "hvparams", {})
2900     if self.hvparams:
2901       if not isinstance(self.hvparams, dict):
2902         raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
2903                                    " dict" % (type(self.hvparams), ))
2904
2905       # check hypervisor parameter syntax (locally)
2906       cluster = self.cfg.GetClusterInfo()
2907       utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
2908       filled_hvp = objects.FillDict(cluster.hvparams[instance.hypervisor],
2909                                     instance.hvparams)
2910       filled_hvp.update(self.hvparams)
2911       hv_type = hypervisor.GetHypervisor(instance.hypervisor)
2912       hv_type.CheckParameterSyntax(filled_hvp)
2913       _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
2914       self.op.hvparams = self.hvparams
2915
2916     _CheckNodeOnline(self, instance.primary_node)
2917
2918     bep = self.cfg.GetClusterInfo().FillBE(instance)
2919     # check bridges existance
2920     _CheckInstanceBridgesExist(self, instance)
2921
2922     remote_info = self.rpc.call_instance_info(instance.primary_node,
2923                                               instance.name,
2924                                               instance.hypervisor)
2925     msg = remote_info.RemoteFailMsg()
2926     if msg:
2927       raise errors.OpPrereqError("Error checking node %s: %s" %
2928                                  (instance.primary_node, msg))
2929     if not remote_info.payload: # not running already
2930       _CheckNodeFreeMemory(self, instance.primary_node,
2931                            "starting instance %s" % instance.name,
2932                            bep[constants.BE_MEMORY], instance.hypervisor)
2933
2934   def Exec(self, feedback_fn):
2935     """Start the instance.
2936
2937     """
2938     instance = self.instance
2939     force = self.op.force
2940
2941     self.cfg.MarkInstanceUp(instance.name)
2942
2943     node_current = instance.primary_node
2944
2945     _StartInstanceDisks(self, instance, force)
2946
2947     result = self.rpc.call_instance_start(node_current, instance,
2948                                           self.hvparams, self.beparams)
2949     msg = result.RemoteFailMsg()
2950     if msg:
2951       _ShutdownInstanceDisks(self, instance)
2952       raise errors.OpExecError("Could not start instance: %s" % msg)
2953
2954
2955 class LURebootInstance(LogicalUnit):
2956   """Reboot an instance.
2957
2958   """
2959   HPATH = "instance-reboot"
2960   HTYPE = constants.HTYPE_INSTANCE
2961   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
2962   REQ_BGL = False
2963
2964   def ExpandNames(self):
2965     if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
2966                                    constants.INSTANCE_REBOOT_HARD,
2967                                    constants.INSTANCE_REBOOT_FULL]:
2968       raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
2969                                   (constants.INSTANCE_REBOOT_SOFT,
2970                                    constants.INSTANCE_REBOOT_HARD,
2971                                    constants.INSTANCE_REBOOT_FULL))
2972     self._ExpandAndLockInstance()
2973
2974   def BuildHooksEnv(self):
2975     """Build hooks env.
2976
2977     This runs on master, primary and secondary nodes of the instance.
2978
2979     """
2980     env = {
2981       "IGNORE_SECONDARIES": self.op.ignore_secondaries,
2982       "REBOOT_TYPE": self.op.reboot_type,
2983       }
2984     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2985     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2986     return env, nl, nl
2987
2988   def CheckPrereq(self):
2989     """Check prerequisites.
2990
2991     This checks that the instance is in the cluster.
2992
2993     """
2994     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2995     assert self.instance is not None, \
2996       "Cannot retrieve locked instance %s" % self.op.instance_name
2997
2998     _CheckNodeOnline(self, instance.primary_node)
2999
3000     # check bridges existance
3001     _CheckInstanceBridgesExist(self, instance)
3002
3003   def Exec(self, feedback_fn):
3004     """Reboot the instance.
3005
3006     """
3007     instance = self.instance
3008     ignore_secondaries = self.op.ignore_secondaries
3009     reboot_type = self.op.reboot_type
3010
3011     node_current = instance.primary_node
3012
3013     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
3014                        constants.INSTANCE_REBOOT_HARD]:
3015       for disk in instance.disks:
3016         self.cfg.SetDiskID(disk, node_current)
3017       result = self.rpc.call_instance_reboot(node_current, instance,
3018                                              reboot_type)
3019       msg = result.RemoteFailMsg()
3020       if msg:
3021         raise errors.OpExecError("Could not reboot instance: %s" % msg)
3022     else:
3023       result = self.rpc.call_instance_shutdown(node_current, instance)
3024       msg = result.RemoteFailMsg()
3025       if msg:
3026         raise errors.OpExecError("Could not shutdown instance for"
3027                                  " full reboot: %s" % msg)
3028       _ShutdownInstanceDisks(self, instance)
3029       _StartInstanceDisks(self, instance, ignore_secondaries)
3030       result = self.rpc.call_instance_start(node_current, instance, None, None)
3031       msg = result.RemoteFailMsg()
3032       if msg:
3033         _ShutdownInstanceDisks(self, instance)
3034         raise errors.OpExecError("Could not start instance for"
3035                                  " full reboot: %s" % msg)
3036
3037     self.cfg.MarkInstanceUp(instance.name)
3038
3039
3040 class LUShutdownInstance(LogicalUnit):
3041   """Shutdown an instance.
3042
3043   """
3044   HPATH = "instance-stop"
3045   HTYPE = constants.HTYPE_INSTANCE
3046   _OP_REQP = ["instance_name"]
3047   REQ_BGL = False
3048
3049   def ExpandNames(self):
3050     self._ExpandAndLockInstance()
3051
3052   def BuildHooksEnv(self):
3053     """Build hooks env.
3054
3055     This runs on master, primary and secondary nodes of the instance.
3056
3057     """
3058     env = _BuildInstanceHookEnvByObject(self, self.instance)
3059     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3060     return env, nl, nl
3061
3062   def CheckPrereq(self):
3063     """Check prerequisites.
3064
3065     This checks that the instance is in the cluster.
3066
3067     """
3068     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3069     assert self.instance is not None, \
3070       "Cannot retrieve locked instance %s" % self.op.instance_name
3071     _CheckNodeOnline(self, self.instance.primary_node)
3072
3073   def Exec(self, feedback_fn):
3074     """Shutdown the instance.
3075
3076     """
3077     instance = self.instance
3078     node_current = instance.primary_node
3079     self.cfg.MarkInstanceDown(instance.name)
3080     result = self.rpc.call_instance_shutdown(node_current, instance)
3081     msg = result.RemoteFailMsg()
3082     if msg:
3083       self.proc.LogWarning("Could not shutdown instance: %s" % msg)
3084
3085     _ShutdownInstanceDisks(self, instance)
3086
3087
3088 class LUReinstallInstance(LogicalUnit):
3089   """Reinstall an instance.
3090
3091   """
3092   HPATH = "instance-reinstall"
3093   HTYPE = constants.HTYPE_INSTANCE
3094   _OP_REQP = ["instance_name"]
3095   REQ_BGL = False
3096
3097   def ExpandNames(self):
3098     self._ExpandAndLockInstance()
3099
3100   def BuildHooksEnv(self):
3101     """Build hooks env.
3102
3103     This runs on master, primary and secondary nodes of the instance.
3104
3105     """
3106     env = _BuildInstanceHookEnvByObject(self, self.instance)
3107     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3108     return env, nl, nl
3109
3110   def CheckPrereq(self):
3111     """Check prerequisites.
3112
3113     This checks that the instance is in the cluster and is not running.
3114
3115     """
3116     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3117     assert instance is not None, \
3118       "Cannot retrieve locked instance %s" % self.op.instance_name
3119     _CheckNodeOnline(self, instance.primary_node)
3120
3121     if instance.disk_template == constants.DT_DISKLESS:
3122       raise errors.OpPrereqError("Instance '%s' has no disks" %
3123                                  self.op.instance_name)
3124     if instance.admin_up:
3125       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3126                                  self.op.instance_name)
3127     remote_info = self.rpc.call_instance_info(instance.primary_node,
3128                                               instance.name,
3129                                               instance.hypervisor)
3130     msg = remote_info.RemoteFailMsg()
3131     if msg:
3132       raise errors.OpPrereqError("Error checking node %s: %s" %
3133                                  (instance.primary_node, msg))
3134     if remote_info.payload:
3135       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3136                                  (self.op.instance_name,
3137                                   instance.primary_node))
3138
3139     self.op.os_type = getattr(self.op, "os_type", None)
3140     if self.op.os_type is not None:
3141       # OS verification
3142       pnode = self.cfg.GetNodeInfo(
3143         self.cfg.ExpandNodeName(instance.primary_node))
3144       if pnode is None:
3145         raise errors.OpPrereqError("Primary node '%s' is unknown" %
3146                                    self.op.pnode)
3147       result = self.rpc.call_os_get(pnode.name, self.op.os_type)
3148       result.Raise()
3149       if not isinstance(result.data, objects.OS):
3150         raise errors.OpPrereqError("OS '%s' not in supported OS list for"
3151                                    " primary node"  % self.op.os_type)
3152
3153     self.instance = instance
3154
3155   def Exec(self, feedback_fn):
3156     """Reinstall the instance.
3157
3158     """
3159     inst = self.instance
3160
3161     if self.op.os_type is not None:
3162       feedback_fn("Changing OS to '%s'..." % self.op.os_type)
3163       inst.os = self.op.os_type
3164       self.cfg.Update(inst)
3165
3166     _StartInstanceDisks(self, inst, None)
3167     try:
3168       feedback_fn("Running the instance OS create scripts...")
3169       result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
3170       msg = result.RemoteFailMsg()
3171       if msg:
3172         raise errors.OpExecError("Could not install OS for instance %s"
3173                                  " on node %s: %s" %
3174                                  (inst.name, inst.primary_node, msg))
3175     finally:
3176       _ShutdownInstanceDisks(self, inst)
3177
3178
3179 class LURenameInstance(LogicalUnit):
3180   """Rename an instance.
3181
3182   """
3183   HPATH = "instance-rename"
3184   HTYPE = constants.HTYPE_INSTANCE
3185   _OP_REQP = ["instance_name", "new_name"]
3186
3187   def BuildHooksEnv(self):
3188     """Build hooks env.
3189
3190     This runs on master, primary and secondary nodes of the instance.
3191
3192     """
3193     env = _BuildInstanceHookEnvByObject(self, self.instance)
3194     env["INSTANCE_NEW_NAME"] = self.op.new_name
3195     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
3196     return env, nl, nl
3197
3198   def CheckPrereq(self):
3199     """Check prerequisites.
3200
3201     This checks that the instance is in the cluster and is not running.
3202
3203     """
3204     instance = self.cfg.GetInstanceInfo(
3205       self.cfg.ExpandInstanceName(self.op.instance_name))
3206     if instance is None:
3207       raise errors.OpPrereqError("Instance '%s' not known" %
3208                                  self.op.instance_name)
3209     _CheckNodeOnline(self, instance.primary_node)
3210
3211     if instance.admin_up:
3212       raise errors.OpPrereqError("Instance '%s' is marked to be up" %
3213                                  self.op.instance_name)
3214     remote_info = self.rpc.call_instance_info(instance.primary_node,
3215                                               instance.name,
3216                                               instance.hypervisor)
3217     msg = remote_info.RemoteFailMsg()
3218     if msg:
3219       raise errors.OpPrereqError("Error checking node %s: %s" %
3220                                  (instance.primary_node, msg))
3221     if remote_info.payload:
3222       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
3223                                  (self.op.instance_name,
3224                                   instance.primary_node))
3225     self.instance = instance
3226
3227     # new name verification
3228     name_info = utils.HostInfo(self.op.new_name)
3229
3230     self.op.new_name = new_name = name_info.name
3231     instance_list = self.cfg.GetInstanceList()
3232     if new_name in instance_list:
3233       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
3234                                  new_name)
3235
3236     if not getattr(self.op, "ignore_ip", False):
3237       if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
3238         raise errors.OpPrereqError("IP %s of instance %s already in use" %
3239                                    (name_info.ip, new_name))
3240
3241
3242   def Exec(self, feedback_fn):
3243     """Reinstall the instance.
3244
3245     """
3246     inst = self.instance
3247     old_name = inst.name
3248
3249     if inst.disk_template == constants.DT_FILE:
3250       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3251
3252     self.cfg.RenameInstance(inst.name, self.op.new_name)
3253     # Change the instance lock. This is definitely safe while we hold the BGL
3254     self.context.glm.remove(locking.LEVEL_INSTANCE, old_name)
3255     self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
3256
3257     # re-read the instance from the configuration after rename
3258     inst = self.cfg.GetInstanceInfo(self.op.new_name)
3259
3260     if inst.disk_template == constants.DT_FILE:
3261       new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
3262       result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
3263                                                      old_file_storage_dir,
3264                                                      new_file_storage_dir)
3265       result.Raise()
3266       if not result.data:
3267         raise errors.OpExecError("Could not connect to node '%s' to rename"
3268                                  " directory '%s' to '%s' (but the instance"
3269                                  " has been renamed in Ganeti)" % (
3270                                  inst.primary_node, old_file_storage_dir,
3271                                  new_file_storage_dir))
3272
3273       if not result.data[0]:
3274         raise errors.OpExecError("Could not rename directory '%s' to '%s'"
3275                                  " (but the instance has been renamed in"
3276                                  " Ganeti)" % (old_file_storage_dir,
3277                                                new_file_storage_dir))
3278
3279     _StartInstanceDisks(self, inst, None)
3280     try:
3281       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
3282                                                  old_name)
3283       msg = result.RemoteFailMsg()
3284       if msg:
3285         msg = ("Could not run OS rename script for instance %s on node %s"
3286                " (but the instance has been renamed in Ganeti): %s" %
3287                (inst.name, inst.primary_node, msg))
3288         self.proc.LogWarning(msg)
3289     finally:
3290       _ShutdownInstanceDisks(self, inst)
3291
3292
3293 class LURemoveInstance(LogicalUnit):
3294   """Remove an instance.
3295
3296   """
3297   HPATH = "instance-remove"
3298   HTYPE = constants.HTYPE_INSTANCE
3299   _OP_REQP = ["instance_name", "ignore_failures"]
3300   REQ_BGL = False
3301
3302   def ExpandNames(self):
3303     self._ExpandAndLockInstance()
3304     self.needed_locks[locking.LEVEL_NODE] = []
3305     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3306
3307   def DeclareLocks(self, level):
3308     if level == locking.LEVEL_NODE:
3309       self._LockInstancesNodes()
3310
3311   def BuildHooksEnv(self):
3312     """Build hooks env.
3313
3314     This runs on master, primary and secondary nodes of the instance.
3315
3316     """
3317     env = _BuildInstanceHookEnvByObject(self, self.instance)
3318     nl = [self.cfg.GetMasterNode()]
3319     return env, nl, nl
3320
3321   def CheckPrereq(self):
3322     """Check prerequisites.
3323
3324     This checks that the instance is in the cluster.
3325
3326     """
3327     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3328     assert self.instance is not None, \
3329       "Cannot retrieve locked instance %s" % self.op.instance_name
3330
3331   def Exec(self, feedback_fn):
3332     """Remove the instance.
3333
3334     """
3335     instance = self.instance
3336     logging.info("Shutting down instance %s on node %s",
3337                  instance.name, instance.primary_node)
3338
3339     result = self.rpc.call_instance_shutdown(instance.primary_node, instance)
3340     msg = result.RemoteFailMsg()
3341     if msg:
3342       if self.op.ignore_failures:
3343         feedback_fn("Warning: can't shutdown instance: %s" % msg)
3344       else:
3345         raise errors.OpExecError("Could not shutdown instance %s on"
3346                                  " node %s: %s" %
3347                                  (instance.name, instance.primary_node, msg))
3348
3349     logging.info("Removing block devices for instance %s", instance.name)
3350
3351     if not _RemoveDisks(self, instance):
3352       if self.op.ignore_failures:
3353         feedback_fn("Warning: can't remove instance's disks")
3354       else:
3355         raise errors.OpExecError("Can't remove instance's disks")
3356
3357     logging.info("Removing instance %s out of cluster config", instance.name)
3358
3359     self.cfg.RemoveInstance(instance.name)
3360     self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
3361
3362
3363 class LUQueryInstances(NoHooksLU):
3364   """Logical unit for querying instances.
3365
3366   """
3367   _OP_REQP = ["output_fields", "names", "use_locking"]
3368   REQ_BGL = False
3369   _FIELDS_STATIC = utils.FieldSet(*["name", "os", "pnode", "snodes",
3370                                     "admin_state",
3371                                     "disk_template", "ip", "mac", "bridge",
3372                                     "sda_size", "sdb_size", "vcpus", "tags",
3373                                     "network_port", "beparams",
3374                                     r"(disk)\.(size)/([0-9]+)",
3375                                     r"(disk)\.(sizes)", "disk_usage",
3376                                     r"(nic)\.(mac|ip|bridge)/([0-9]+)",
3377                                     r"(nic)\.(macs|ips|bridges)",
3378                                     r"(disk|nic)\.(count)",
3379                                     "serial_no", "hypervisor", "hvparams",] +
3380                                   ["hv/%s" % name
3381                                    for name in constants.HVS_PARAMETERS] +
3382                                   ["be/%s" % name
3383                                    for name in constants.BES_PARAMETERS])
3384   _FIELDS_DYNAMIC = utils.FieldSet("oper_state", "oper_ram", "status")
3385
3386
3387   def ExpandNames(self):
3388     _CheckOutputFields(static=self._FIELDS_STATIC,
3389                        dynamic=self._FIELDS_DYNAMIC,
3390                        selected=self.op.output_fields)
3391
3392     self.needed_locks = {}
3393     self.share_locks[locking.LEVEL_INSTANCE] = 1
3394     self.share_locks[locking.LEVEL_NODE] = 1
3395
3396     if self.op.names:
3397       self.wanted = _GetWantedInstances(self, self.op.names)
3398     else:
3399       self.wanted = locking.ALL_SET
3400
3401     self.do_node_query = self._FIELDS_STATIC.NonMatching(self.op.output_fields)
3402     self.do_locking = self.do_node_query and self.op.use_locking
3403     if self.do_locking:
3404       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
3405       self.needed_locks[locking.LEVEL_NODE] = []
3406       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3407
3408   def DeclareLocks(self, level):
3409     if level == locking.LEVEL_NODE and self.do_locking:
3410       self._LockInstancesNodes()
3411
3412   def CheckPrereq(self):
3413     """Check prerequisites.
3414
3415     """
3416     pass
3417
3418   def Exec(self, feedback_fn):
3419     """Computes the list of nodes and their attributes.
3420
3421     """
3422     all_info = self.cfg.GetAllInstancesInfo()
3423     if self.wanted == locking.ALL_SET:
3424       # caller didn't specify instance names, so ordering is not important
3425       if self.do_locking:
3426         instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
3427       else:
3428         instance_names = all_info.keys()
3429       instance_names = utils.NiceSort(instance_names)
3430     else:
3431       # caller did specify names, so we must keep the ordering
3432       if self.do_locking:
3433         tgt_set = self.acquired_locks[locking.LEVEL_INSTANCE]
3434       else:
3435         tgt_set = all_info.keys()
3436       missing = set(self.wanted).difference(tgt_set)
3437       if missing:
3438         raise errors.OpExecError("Some instances were removed before"
3439                                  " retrieving their data: %s" % missing)
3440       instance_names = self.wanted
3441
3442     instance_list = [all_info[iname] for iname in instance_names]
3443
3444     # begin data gathering
3445
3446     nodes = frozenset([inst.primary_node for inst in instance_list])
3447     hv_list = list(set([inst.hypervisor for inst in instance_list]))
3448
3449     bad_nodes = []
3450     off_nodes = []
3451     if self.do_node_query:
3452       live_data = {}
3453       node_data = self.rpc.call_all_instances_info(nodes, hv_list)
3454       for name in nodes:
3455         result = node_data[name]
3456         if result.offline:
3457           # offline nodes will be in both lists
3458           off_nodes.append(name)
3459         if result.failed or result.RemoteFailMsg():
3460           bad_nodes.append(name)
3461         else:
3462           if result.payload:
3463             live_data.update(result.payload)
3464           # else no instance is alive
3465     else:
3466       live_data = dict([(name, {}) for name in instance_names])
3467
3468     # end data gathering
3469
3470     HVPREFIX = "hv/"
3471     BEPREFIX = "be/"
3472     output = []
3473     for instance in instance_list:
3474       iout = []
3475       i_hv = self.cfg.GetClusterInfo().FillHV(instance)
3476       i_be = self.cfg.GetClusterInfo().FillBE(instance)
3477       for field in self.op.output_fields:
3478         st_match = self._FIELDS_STATIC.Matches(field)
3479         if field == "name":
3480           val = instance.name
3481         elif field == "os":
3482           val = instance.os
3483         elif field == "pnode":
3484           val = instance.primary_node
3485         elif field == "snodes":
3486           val = list(instance.secondary_nodes)
3487         elif field == "admin_state":
3488           val = instance.admin_up
3489         elif field == "oper_state":
3490           if instance.primary_node in bad_nodes:
3491             val = None
3492           else:
3493             val = bool(live_data.get(instance.name))
3494         elif field == "status":
3495           if instance.primary_node in off_nodes:
3496             val = "ERROR_nodeoffline"
3497           elif instance.primary_node in bad_nodes:
3498             val = "ERROR_nodedown"
3499           else:
3500             running = bool(live_data.get(instance.name))
3501             if running:
3502               if instance.admin_up:
3503                 val = "running"
3504               else:
3505                 val = "ERROR_up"
3506             else:
3507               if instance.admin_up:
3508                 val = "ERROR_down"
3509               else:
3510                 val = "ADMIN_down"
3511         elif field == "oper_ram":
3512           if instance.primary_node in bad_nodes:
3513             val = None
3514           elif instance.name in live_data:
3515             val = live_data[instance.name].get("memory", "?")
3516           else:
3517             val = "-"
3518         elif field == "disk_template":
3519           val = instance.disk_template
3520         elif field == "ip":
3521           val = instance.nics[0].ip
3522         elif field == "bridge":
3523           val = instance.nics[0].bridge
3524         elif field == "mac":
3525           val = instance.nics[0].mac
3526         elif field == "sda_size" or field == "sdb_size":
3527           idx = ord(field[2]) - ord('a')
3528           try:
3529             val = instance.FindDisk(idx).size
3530           except errors.OpPrereqError:
3531             val = None
3532         elif field == "disk_usage": # total disk usage per node
3533           disk_sizes = [{'size': disk.size} for disk in instance.disks]
3534           val = _ComputeDiskSize(instance.disk_template, disk_sizes)
3535         elif field == "tags":
3536           val = list(instance.GetTags())
3537         elif field == "serial_no":
3538           val = instance.serial_no
3539         elif field == "network_port":
3540           val = instance.network_port
3541         elif field == "hypervisor":
3542           val = instance.hypervisor
3543         elif field == "hvparams":
3544           val = i_hv
3545         elif (field.startswith(HVPREFIX) and
3546               field[len(HVPREFIX):] in constants.HVS_PARAMETERS):
3547           val = i_hv.get(field[len(HVPREFIX):], None)
3548         elif field == "beparams":
3549           val = i_be
3550         elif (field.startswith(BEPREFIX) and
3551               field[len(BEPREFIX):] in constants.BES_PARAMETERS):
3552           val = i_be.get(field[len(BEPREFIX):], None)
3553         elif st_match and st_match.groups():
3554           # matches a variable list
3555           st_groups = st_match.groups()
3556           if st_groups and st_groups[0] == "disk":
3557             if st_groups[1] == "count":
3558               val = len(instance.disks)
3559             elif st_groups[1] == "sizes":
3560               val = [disk.size for disk in instance.disks]
3561             elif st_groups[1] == "size":
3562               try:
3563                 val = instance.FindDisk(st_groups[2]).size
3564               except errors.OpPrereqError:
3565                 val = None
3566             else:
3567               assert False, "Unhandled disk parameter"
3568           elif st_groups[0] == "nic":
3569             if st_groups[1] == "count":
3570               val = len(instance.nics)
3571             elif st_groups[1] == "macs":
3572               val = [nic.mac for nic in instance.nics]
3573             elif st_groups[1] == "ips":
3574               val = [nic.ip for nic in instance.nics]
3575             elif st_groups[1] == "bridges":
3576               val = [nic.bridge for nic in instance.nics]
3577             else:
3578               # index-based item
3579               nic_idx = int(st_groups[2])
3580               if nic_idx >= len(instance.nics):
3581                 val = None
3582               else:
3583                 if st_groups[1] == "mac":
3584                   val = instance.nics[nic_idx].mac
3585                 elif st_groups[1] == "ip":
3586                   val = instance.nics[nic_idx].ip
3587                 elif st_groups[1] == "bridge":
3588                   val = instance.nics[nic_idx].bridge
3589                 else:
3590                   assert False, "Unhandled NIC parameter"
3591           else:
3592             assert False, "Unhandled variable parameter"
3593         else:
3594           raise errors.ParameterError(field)
3595         iout.append(val)
3596       output.append(iout)
3597
3598     return output
3599
3600
3601 class LUFailoverInstance(LogicalUnit):
3602   """Failover an instance.
3603
3604   """
3605   HPATH = "instance-failover"
3606   HTYPE = constants.HTYPE_INSTANCE
3607   _OP_REQP = ["instance_name", "ignore_consistency"]
3608   REQ_BGL = False
3609
3610   def ExpandNames(self):
3611     self._ExpandAndLockInstance()
3612     self.needed_locks[locking.LEVEL_NODE] = []
3613     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3614
3615   def DeclareLocks(self, level):
3616     if level == locking.LEVEL_NODE:
3617       self._LockInstancesNodes()
3618
3619   def BuildHooksEnv(self):
3620     """Build hooks env.
3621
3622     This runs on master, primary and secondary nodes of the instance.
3623
3624     """
3625     env = {
3626       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
3627       }
3628     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
3629     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3630     return env, nl, nl
3631
3632   def CheckPrereq(self):
3633     """Check prerequisites.
3634
3635     This checks that the instance is in the cluster.
3636
3637     """
3638     self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
3639     assert self.instance is not None, \
3640       "Cannot retrieve locked instance %s" % self.op.instance_name
3641
3642     bep = self.cfg.GetClusterInfo().FillBE(instance)
3643     if instance.disk_template not in constants.DTS_NET_MIRROR:
3644       raise errors.OpPrereqError("Instance's disk layout is not"
3645                                  " network mirrored, cannot failover.")
3646
3647     secondary_nodes = instance.secondary_nodes
3648     if not secondary_nodes:
3649       raise errors.ProgrammerError("no secondary node but using "
3650                                    "a mirrored disk template")
3651
3652     target_node = secondary_nodes[0]
3653     _CheckNodeOnline(self, target_node)
3654     _CheckNodeNotDrained(self, target_node)
3655     # check memory requirements on the secondary node
3656     _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
3657                          instance.name, bep[constants.BE_MEMORY],
3658                          instance.hypervisor)
3659     # check bridge existance
3660     _CheckInstanceBridgesExist(self, instance, node=target_node)
3661
3662   def Exec(self, feedback_fn):
3663     """Failover an instance.
3664
3665     The failover is done by shutting it down on its present node and
3666     starting it on the secondary.
3667
3668     """
3669     instance = self.instance
3670
3671     source_node = instance.primary_node
3672     target_node = instance.secondary_nodes[0]
3673
3674     feedback_fn("* checking disk consistency between source and target")
3675     for dev in instance.disks:
3676       # for drbd, these are drbd over lvm
3677       if not _CheckDiskConsistency(self, dev, target_node, False):
3678         if instance.admin_up and not self.op.ignore_consistency:
3679           raise errors.OpExecError("Disk %s is degraded on target node,"
3680                                    " aborting failover." % dev.iv_name)
3681
3682     feedback_fn("* shutting down instance on source node")
3683     logging.info("Shutting down instance %s on node %s",
3684                  instance.name, source_node)
3685
3686     result = self.rpc.call_instance_shutdown(source_node, instance)
3687     msg = result.RemoteFailMsg()
3688     if msg:
3689       if self.op.ignore_consistency:
3690         self.proc.LogWarning("Could not shutdown instance %s on node %s."
3691                              " Proceeding anyway. Please make sure node"
3692                              " %s is down. Error details: %s",
3693                              instance.name, source_node, source_node, msg)
3694       else:
3695         raise errors.OpExecError("Could not shutdown instance %s on"
3696                                  " node %s: %s" %
3697                                  (instance.name, source_node, msg))
3698
3699     feedback_fn("* deactivating the instance's disks on source node")
3700     if not _ShutdownInstanceDisks(self, instance, ignore_primary=True):
3701       raise errors.OpExecError("Can't shut down the instance's disks.")
3702
3703     instance.primary_node = target_node
3704     # distribute new instance config to the other nodes
3705     self.cfg.Update(instance)
3706
3707     # Only start the instance if it's marked as up
3708     if instance.admin_up:
3709       feedback_fn("* activating the instance's disks on target node")
3710       logging.info("Starting instance %s on node %s",
3711                    instance.name, target_node)
3712
3713       disks_ok, dummy = _AssembleInstanceDisks(self, instance,
3714                                                ignore_secondaries=True)
3715       if not disks_ok:
3716         _ShutdownInstanceDisks(self, instance)
3717         raise errors.OpExecError("Can't activate the instance's disks")
3718
3719       feedback_fn("* starting the instance on the target node")
3720       result = self.rpc.call_instance_start(target_node, instance, None, None)
3721       msg = result.RemoteFailMsg()
3722       if msg:
3723         _ShutdownInstanceDisks(self, instance)
3724         raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3725                                  (instance.name, target_node, msg))
3726
3727
3728 class LUMigrateInstance(LogicalUnit):
3729   """Migrate an instance.
3730
3731   This is migration without shutting down, compared to the failover,
3732   which is done with shutdown.
3733
3734   """
3735   HPATH = "instance-migrate"
3736   HTYPE = constants.HTYPE_INSTANCE
3737   _OP_REQP = ["instance_name", "live", "cleanup"]
3738
3739   REQ_BGL = False
3740
3741   def ExpandNames(self):
3742     self._ExpandAndLockInstance()
3743     self.needed_locks[locking.LEVEL_NODE] = []
3744     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
3745
3746   def DeclareLocks(self, level):
3747     if level == locking.LEVEL_NODE:
3748       self._LockInstancesNodes()
3749
3750   def BuildHooksEnv(self):
3751     """Build hooks env.
3752
3753     This runs on master, primary and secondary nodes of the instance.
3754
3755     """
3756     env = _BuildInstanceHookEnvByObject(self, self.instance)
3757     env["MIGRATE_LIVE"] = self.op.live
3758     env["MIGRATE_CLEANUP"] = self.op.cleanup
3759     nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
3760     return env, nl, nl
3761
3762   def CheckPrereq(self):
3763     """Check prerequisites.
3764
3765     This checks that the instance is in the cluster.
3766
3767     """
3768     instance = self.cfg.GetInstanceInfo(
3769       self.cfg.ExpandInstanceName(self.op.instance_name))
3770     if instance is None:
3771       raise errors.OpPrereqError("Instance '%s' not known" %
3772                                  self.op.instance_name)
3773
3774     if instance.disk_template != constants.DT_DRBD8:
3775       raise errors.OpPrereqError("Instance's disk layout is not"
3776                                  " drbd8, cannot migrate.")
3777
3778     secondary_nodes = instance.secondary_nodes
3779     if not secondary_nodes:
3780       raise errors.ConfigurationError("No secondary node but using"
3781                                       " drbd8 disk template")
3782
3783     i_be = self.cfg.GetClusterInfo().FillBE(instance)
3784
3785     target_node = secondary_nodes[0]
3786     # check memory requirements on the secondary node
3787     _CheckNodeFreeMemory(self, target_node, "migrating instance %s" %
3788                          instance.name, i_be[constants.BE_MEMORY],
3789                          instance.hypervisor)
3790
3791     # check bridge existance
3792     _CheckInstanceBridgesExist(self, instance, node=target_node)
3793
3794     if not self.op.cleanup:
3795       _CheckNodeNotDrained(self, target_node)
3796       result = self.rpc.call_instance_migratable(instance.primary_node,
3797                                                  instance)
3798       msg = result.RemoteFailMsg()
3799       if msg:
3800         raise errors.OpPrereqError("Can't migrate: %s - please use failover" %
3801                                    msg)
3802
3803     self.instance = instance
3804
3805   def _WaitUntilSync(self):
3806     """Poll with custom rpc for disk sync.
3807
3808     This uses our own step-based rpc call.
3809
3810     """
3811     self.feedback_fn("* wait until resync is done")
3812     all_done = False
3813     while not all_done:
3814       all_done = True
3815       result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3816                                             self.nodes_ip,
3817                                             self.instance.disks)
3818       min_percent = 100
3819       for node, nres in result.items():
3820         msg = nres.RemoteFailMsg()
3821         if msg:
3822           raise errors.OpExecError("Cannot resync disks on node %s: %s" %
3823                                    (node, msg))
3824         node_done, node_percent = nres.payload
3825         all_done = all_done and node_done
3826         if node_percent is not None:
3827           min_percent = min(min_percent, node_percent)
3828       if not all_done:
3829         if min_percent < 100:
3830           self.feedback_fn("   - progress: %.1f%%" % min_percent)
3831         time.sleep(2)
3832
3833   def _EnsureSecondary(self, node):
3834     """Demote a node to secondary.
3835
3836     """
3837     self.feedback_fn("* switching node %s to secondary mode" % node)
3838
3839     for dev in self.instance.disks:
3840       self.cfg.SetDiskID(dev, node)
3841
3842     result = self.rpc.call_blockdev_close(node, self.instance.name,
3843                                           self.instance.disks)
3844     msg = result.RemoteFailMsg()
3845     if msg:
3846       raise errors.OpExecError("Cannot change disk to secondary on node %s,"
3847                                " error %s" % (node, msg))
3848
3849   def _GoStandalone(self):
3850     """Disconnect from the network.
3851
3852     """
3853     self.feedback_fn("* changing into standalone mode")
3854     result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3855                                                self.instance.disks)
3856     for node, nres in result.items():
3857       msg = nres.RemoteFailMsg()
3858       if msg:
3859         raise errors.OpExecError("Cannot disconnect disks node %s,"
3860                                  " error %s" % (node, msg))
3861
3862   def _GoReconnect(self, multimaster):
3863     """Reconnect to the network.
3864
3865     """
3866     if multimaster:
3867       msg = "dual-master"
3868     else:
3869       msg = "single-master"
3870     self.feedback_fn("* changing disks into %s mode" % msg)
3871     result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3872                                            self.instance.disks,
3873                                            self.instance.name, multimaster)
3874     for node, nres in result.items():
3875       msg = nres.RemoteFailMsg()
3876       if msg:
3877         raise errors.OpExecError("Cannot change disks config on node %s,"
3878                                  " error: %s" % (node, msg))
3879
3880   def _ExecCleanup(self):
3881     """Try to cleanup after a failed migration.
3882
3883     The cleanup is done by:
3884       - check that the instance is running only on one node
3885         (and update the config if needed)
3886       - change disks on its secondary node to secondary
3887       - wait until disks are fully synchronized
3888       - disconnect from the network
3889       - change disks into single-master mode
3890       - wait again until disks are fully synchronized
3891
3892     """
3893     instance = self.instance
3894     target_node = self.target_node
3895     source_node = self.source_node
3896
3897     # check running on only one node
3898     self.feedback_fn("* checking where the instance actually runs"
3899                      " (if this hangs, the hypervisor might be in"
3900                      " a bad state)")
3901     ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3902     for node, result in ins_l.items():
3903       msg = result.RemoteFailMsg()
3904       if msg:
3905         raise errors.OpExecError("Can't contact node %s: %s" % (node, msg))
3906
3907     runningon_source = instance.name in ins_l[source_node].payload
3908     runningon_target = instance.name in ins_l[target_node].payload
3909
3910     if runningon_source and runningon_target:
3911       raise errors.OpExecError("Instance seems to be running on two nodes,"
3912                                " or the hypervisor is confused. You will have"
3913                                " to ensure manually that it runs only on one"
3914                                " and restart this operation.")
3915
3916     if not (runningon_source or runningon_target):
3917       raise errors.OpExecError("Instance does not seem to be running at all."
3918                                " In this case, it's safer to repair by"
3919                                " running 'gnt-instance stop' to ensure disk"
3920                                " shutdown, and then restarting it.")
3921
3922     if runningon_target:
3923       # the migration has actually succeeded, we need to update the config
3924       self.feedback_fn("* instance running on secondary node (%s),"
3925                        " updating config" % target_node)
3926       instance.primary_node = target_node
3927       self.cfg.Update(instance)
3928       demoted_node = source_node
3929     else:
3930       self.feedback_fn("* instance confirmed to be running on its"
3931                        " primary node (%s)" % source_node)
3932       demoted_node = target_node
3933
3934     self._EnsureSecondary(demoted_node)
3935     try:
3936       self._WaitUntilSync()
3937     except errors.OpExecError:
3938       # we ignore here errors, since if the device is standalone, it
3939       # won't be able to sync
3940       pass
3941     self._GoStandalone()
3942     self._GoReconnect(False)
3943     self._WaitUntilSync()
3944
3945     self.feedback_fn("* done")
3946
3947   def _RevertDiskStatus(self):
3948     """Try to revert the disk status after a failed migration.
3949
3950     """
3951     target_node = self.target_node
3952     try:
3953       self._EnsureSecondary(target_node)
3954       self._GoStandalone()
3955       self._GoReconnect(False)
3956       self._WaitUntilSync()
3957     except errors.OpExecError, err:
3958       self.LogWarning("Migration failed and I can't reconnect the"
3959                       " drives: error '%s'\n"
3960                       "Please look and recover the instance status" %
3961                       str(err))
3962
3963   def _AbortMigration(self):
3964     """Call the hypervisor code to abort a started migration.
3965
3966     """
3967     instance = self.instance
3968     target_node = self.target_node
3969     migration_info = self.migration_info
3970
3971     abort_result = self.rpc.call_finalize_migration(target_node,
3972                                                     instance,
3973                                                     migration_info,
3974                                                     False)
3975     abort_msg = abort_result.RemoteFailMsg()
3976     if abort_msg:
3977       logging.error("Aborting migration failed on target node %s: %s" %
3978                     (target_node, abort_msg))
3979       # Don't raise an exception here, as we stil have to try to revert the
3980       # disk status, even if this step failed.
3981
3982   def _ExecMigration(self):
3983     """Migrate an instance.
3984
3985     The migrate is done by:
3986       - change the disks into dual-master mode
3987       - wait until disks are fully synchronized again
3988       - migrate the instance
3989       - change disks on the new secondary node (the old primary) to secondary
3990       - wait until disks are fully synchronized
3991       - change disks into single-master mode
3992
3993     """
3994     instance = self.instance
3995     target_node = self.target_node
3996     source_node = self.source_node
3997
3998     self.feedback_fn("* checking disk consistency between source and target")
3999     for dev in instance.disks:
4000       if not _CheckDiskConsistency(self, dev, target_node, False):
4001         raise errors.OpExecError("Disk %s is degraded or not fully"
4002                                  " synchronized on target node,"
4003                                  " aborting migrate." % dev.iv_name)
4004
4005     # First get the migration information from the remote node
4006     result = self.rpc.call_migration_info(source_node, instance)
4007     msg = result.RemoteFailMsg()
4008     if msg:
4009       log_err = ("Failed fetching source migration information from %s: %s" %
4010                  (source_node, msg))
4011       logging.error(log_err)
4012       raise errors.OpExecError(log_err)
4013
4014     self.migration_info = migration_info = result.payload
4015
4016     # Then switch the disks to master/master mode
4017     self._EnsureSecondary(target_node)
4018     self._GoStandalone()
4019     self._GoReconnect(True)
4020     self._WaitUntilSync()
4021
4022     self.feedback_fn("* preparing %s to accept the instance" % target_node)
4023     result = self.rpc.call_accept_instance(target_node,
4024                                            instance,
4025                                            migration_info,
4026                                            self.nodes_ip[target_node])
4027
4028     msg = result.RemoteFailMsg()
4029     if msg:
4030       logging.error("Instance pre-migration failed, trying to revert"
4031                     " disk status: %s", msg)
4032       self._AbortMigration()
4033       self._RevertDiskStatus()
4034       raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
4035                                (instance.name, msg))
4036
4037     self.feedback_fn("* migrating instance to %s" % target_node)
4038     time.sleep(10)
4039     result = self.rpc.call_instance_migrate(source_node, instance,
4040                                             self.nodes_ip[target_node],
4041                                             self.op.live)
4042     msg = result.RemoteFailMsg()
4043     if msg:
4044       logging.error("Instance migration failed, trying to revert"
4045                     " disk status: %s", msg)
4046       self._AbortMigration()
4047       self._RevertDiskStatus()
4048       raise errors.OpExecError("Could not migrate instance %s: %s" %
4049                                (instance.name, msg))
4050     time.sleep(10)
4051
4052     instance.primary_node = target_node
4053     # distribute new instance config to the other nodes
4054     self.cfg.Update(instance)
4055
4056     result = self.rpc.call_finalize_migration(target_node,
4057                                               instance,
4058                                               migration_info,
4059                                               True)
4060     msg = result.RemoteFailMsg()
4061     if msg:
4062       logging.error("Instance migration succeeded, but finalization failed:"
4063                     " %s" % msg)
4064       raise errors.OpExecError("Could not finalize instance migration: %s" %
4065                                msg)
4066
4067     self._EnsureSecondary(source_node)
4068     self._WaitUntilSync()
4069     self._GoStandalone()
4070     self._GoReconnect(False)
4071     self._WaitUntilSync()
4072
4073     self.feedback_fn("* done")
4074
4075   def Exec(self, feedback_fn):
4076     """Perform the migration.
4077
4078     """
4079     self.feedback_fn = feedback_fn
4080
4081     self.source_node = self.instance.primary_node
4082     self.target_node = self.instance.secondary_nodes[0]
4083     self.all_nodes = [self.source_node, self.target_node]
4084     self.nodes_ip = {
4085       self.source_node: self.cfg.GetNodeInfo(self.source_node).secondary_ip,
4086       self.target_node: self.cfg.GetNodeInfo(self.target_node).secondary_ip,
4087       }
4088     if self.op.cleanup:
4089       return self._ExecCleanup()
4090     else:
4091       return self._ExecMigration()
4092
4093
4094 def _CreateBlockDev(lu, node, instance, device, force_create,
4095                     info, force_open):
4096   """Create a tree of block devices on a given node.
4097
4098   If this device type has to be created on secondaries, create it and
4099   all its children.
4100
4101   If not, just recurse to children keeping the same 'force' value.
4102
4103   @param lu: the lu on whose behalf we execute
4104   @param node: the node on which to create the device
4105   @type instance: L{objects.Instance}
4106   @param instance: the instance which owns the device
4107   @type device: L{objects.Disk}
4108   @param device: the device to create
4109   @type force_create: boolean
4110   @param force_create: whether to force creation of this device; this
4111       will be change to True whenever we find a device which has
4112       CreateOnSecondary() attribute
4113   @param info: the extra 'metadata' we should attach to the device
4114       (this will be represented as a LVM tag)
4115   @type force_open: boolean
4116   @param force_open: this parameter will be passes to the
4117       L{backend.BlockdevCreate} function where it specifies
4118       whether we run on primary or not, and it affects both
4119       the child assembly and the device own Open() execution
4120
4121   """
4122   if device.CreateOnSecondary():
4123     force_create = True
4124
4125   if device.children:
4126     for child in device.children:
4127       _CreateBlockDev(lu, node, instance, child, force_create,
4128                       info, force_open)
4129
4130   if not force_create:
4131     return
4132
4133   _CreateSingleBlockDev(lu, node, instance, device, info, force_open)
4134
4135
4136 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open):
4137   """Create a single block device on a given node.
4138
4139   This will not recurse over children of the device, so they must be
4140   created in advance.
4141
4142   @param lu: the lu on whose behalf we execute
4143   @param node: the node on which to create the device
4144   @type instance: L{objects.Instance}
4145   @param instance: the instance which owns the device
4146   @type device: L{objects.Disk}
4147   @param device: the device to create
4148   @param info: the extra 'metadata' we should attach to the device
4149       (this will be represented as a LVM tag)
4150   @type force_open: boolean
4151   @param force_open: this parameter will be passes to the
4152       L{backend.BlockdevCreate} function where it specifies
4153       whether we run on primary or not, and it affects both
4154       the child assembly and the device own Open() execution
4155
4156   """
4157   lu.cfg.SetDiskID(device, node)
4158   result = lu.rpc.call_blockdev_create(node, device, device.size,
4159                                        instance.name, force_open, info)
4160   msg = result.RemoteFailMsg()
4161   if msg:
4162     raise errors.OpExecError("Can't create block device %s on"
4163                              " node %s for instance %s: %s" %
4164                              (device, node, instance.name, msg))
4165   if device.physical_id is None:
4166     device.physical_id = result.payload
4167
4168
4169 def _GenerateUniqueNames(lu, exts):
4170   """Generate a suitable LV name.
4171
4172   This will generate a logical volume name for the given instance.
4173
4174   """
4175   results = []
4176   for val in exts:
4177     new_id = lu.cfg.GenerateUniqueID()
4178     results.append("%s%s" % (new_id, val))
4179   return results
4180
4181
4182 def _GenerateDRBD8Branch(lu, primary, secondary, size, names, iv_name,
4183                          p_minor, s_minor):
4184   """Generate a drbd8 device complete with its children.
4185
4186   """
4187   port = lu.cfg.AllocatePort()
4188   vgname = lu.cfg.GetVGName()
4189   shared_secret = lu.cfg.GenerateDRBDSecret()
4190   dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
4191                           logical_id=(vgname, names[0]))
4192   dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
4193                           logical_id=(vgname, names[1]))
4194   drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
4195                           logical_id=(primary, secondary, port,
4196                                       p_minor, s_minor,
4197                                       shared_secret),
4198                           children=[dev_data, dev_meta],
4199                           iv_name=iv_name)
4200   return drbd_dev
4201
4202
4203 def _GenerateDiskTemplate(lu, template_name,
4204                           instance_name, primary_node,
4205                           secondary_nodes, disk_info,
4206                           file_storage_dir, file_driver,
4207                           base_index):
4208   """Generate the entire disk layout for a given template type.
4209
4210   """
4211   #TODO: compute space requirements
4212
4213   vgname = lu.cfg.GetVGName()
4214   disk_count = len(disk_info)
4215   disks = []
4216   if template_name == constants.DT_DISKLESS:
4217     pass
4218   elif template_name == constants.DT_PLAIN:
4219     if len(secondary_nodes) != 0:
4220       raise errors.ProgrammerError("Wrong template configuration")
4221
4222     names = _GenerateUniqueNames(lu, [".disk%d" % i
4223                                       for i in range(disk_count)])
4224     for idx, disk in enumerate(disk_info):
4225       disk_index = idx + base_index
4226       disk_dev = objects.Disk(dev_type=constants.LD_LV, size=disk["size"],
4227                               logical_id=(vgname, names[idx]),
4228                               iv_name="disk/%d" % disk_index,
4229                               mode=disk["mode"])
4230       disks.append(disk_dev)
4231   elif template_name == constants.DT_DRBD8:
4232     if len(secondary_nodes) != 1:
4233       raise errors.ProgrammerError("Wrong template configuration")
4234     remote_node = secondary_nodes[0]
4235     minors = lu.cfg.AllocateDRBDMinor(
4236       [primary_node, remote_node] * len(disk_info), instance_name)
4237
4238     names = []
4239     for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
4240                                                for i in range(disk_count)]):
4241       names.append(lv_prefix + "_data")
4242       names.append(lv_prefix + "_meta")
4243     for idx, disk in enumerate(disk_info):
4244       disk_index = idx + base_index
4245       disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
4246                                       disk["size"], names[idx*2:idx*2+2],
4247                                       "disk/%d" % disk_index,
4248                                       minors[idx*2], minors[idx*2+1])
4249       disk_dev.mode = disk["mode"]
4250       disks.append(disk_dev)
4251   elif template_name == constants.DT_FILE:
4252     if len(secondary_nodes) != 0:
4253       raise errors.ProgrammerError("Wrong template configuration")
4254
4255     for idx, disk in enumerate(disk_info):
4256       disk_index = idx + base_index
4257       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
4258                               iv_name="disk/%d" % disk_index,
4259                               logical_id=(file_driver,
4260                                           "%s/disk%d" % (file_storage_dir,
4261                                                          disk_index)),
4262                               mode=disk["mode"])
4263       disks.append(disk_dev)
4264   else:
4265     raise errors.ProgrammerError("Invalid disk template '%s'" % template_name)
4266   return disks
4267
4268
4269 def _GetInstanceInfoText(instance):
4270   """Compute that text that should be added to the disk's metadata.
4271
4272   """
4273   return "originstname+%s" % instance.name
4274
4275
4276 def _CreateDisks(lu, instance):
4277   """Create all disks for an instance.
4278
4279   This abstracts away some work from AddInstance.
4280
4281   @type lu: L{LogicalUnit}
4282   @param lu: the logical unit on whose behalf we execute
4283   @type instance: L{objects.Instance}
4284   @param instance: the instance whose disks we should create
4285   @rtype: boolean
4286   @return: the success of the creation
4287
4288   """
4289   info = _GetInstanceInfoText(instance)
4290   pnode = instance.primary_node
4291
4292   if instance.disk_template == constants.DT_FILE:
4293     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4294     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
4295
4296     if result.failed or not result.data:
4297       raise errors.OpExecError("Could not connect to node '%s'" % pnode)
4298
4299     if not result.data[0]:
4300       raise errors.OpExecError("Failed to create directory '%s'" %
4301                                file_storage_dir)
4302
4303   # Note: this needs to be kept in sync with adding of disks in
4304   # LUSetInstanceParams
4305   for device in instance.disks:
4306     logging.info("Creating volume %s for instance %s",
4307                  device.iv_name, instance.name)
4308     #HARDCODE
4309     for node in instance.all_nodes:
4310       f_create = node == pnode
4311       _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
4312
4313
4314 def _RemoveDisks(lu, instance):
4315   """Remove all disks for an instance.
4316
4317   This abstracts away some work from `AddInstance()` and
4318   `RemoveInstance()`. Note that in case some of the devices couldn't
4319   be removed, the removal will continue with the other ones (compare
4320   with `_CreateDisks()`).
4321
4322   @type lu: L{LogicalUnit}
4323   @param lu: the logical unit on whose behalf we execute
4324   @type instance: L{objects.Instance}
4325   @param instance: the instance whose disks we should remove
4326   @rtype: boolean
4327   @return: the success of the removal
4328
4329   """
4330   logging.info("Removing block devices for instance %s", instance.name)
4331
4332   all_result = True
4333   for device in instance.disks:
4334     for node, disk in device.ComputeNodeTree(instance.primary_node):
4335       lu.cfg.SetDiskID(disk, node)
4336       msg = lu.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
4337       if msg:
4338         lu.LogWarning("Could not remove block device %s on node %s,"
4339                       " continuing anyway: %s", device.iv_name, node, msg)
4340         all_result = False
4341
4342   if instance.disk_template == constants.DT_FILE:
4343     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
4344     result = lu.rpc.call_file_storage_dir_remove(instance.primary_node,
4345                                                  file_storage_dir)
4346     if result.failed or not result.data:
4347       logging.error("Could not remove directory '%s'", file_storage_dir)
4348       all_result = False
4349
4350   return all_result
4351
4352
4353 def _ComputeDiskSize(disk_template, disks):
4354   """Compute disk size requirements in the volume group
4355
4356   """
4357   # Required free disk space as a function of disk and swap space
4358   req_size_dict = {
4359     constants.DT_DISKLESS: None,
4360     constants.DT_PLAIN: sum(d["size"] for d in disks),
4361     # 128 MB are added for drbd metadata for each disk
4362     constants.DT_DRBD8: sum(d["size"] + 128 for d in disks),
4363     constants.DT_FILE: None,
4364   }
4365
4366   if disk_template not in req_size_dict:
4367     raise errors.ProgrammerError("Disk template '%s' size requirement"
4368                                  " is unknown" %  disk_template)
4369
4370   return req_size_dict[disk_template]
4371
4372
4373 def _CheckHVParams(lu, nodenames, hvname, hvparams):
4374   """Hypervisor parameter validation.
4375
4376   This function abstract the hypervisor parameter validation to be
4377   used in both instance create and instance modify.
4378
4379   @type lu: L{LogicalUnit}
4380   @param lu: the logical unit for which we check
4381   @type nodenames: list
4382   @param nodenames: the list of nodes on which we should check
4383   @type hvname: string
4384   @param hvname: the name of the hypervisor we should use
4385   @type hvparams: dict
4386   @param hvparams: the parameters which we need to check
4387   @raise errors.OpPrereqError: if the parameters are not valid
4388
4389   """
4390   hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames,
4391                                                   hvname,
4392                                                   hvparams)
4393   for node in nodenames:
4394     info = hvinfo[node]
4395     if info.offline:
4396       continue
4397     msg = info.RemoteFailMsg()
4398     if msg:
4399       raise errors.OpPrereqError("Hypervisor parameter validation"
4400                                  " failed on node %s: %s" % (node, msg))
4401
4402
4403 class LUCreateInstance(LogicalUnit):
4404   """Create an instance.
4405
4406   """
4407   HPATH = "instance-add"
4408   HTYPE = constants.HTYPE_INSTANCE
4409   _OP_REQP = ["instance_name", "disks", "disk_template",
4410               "mode", "start",
4411               "wait_for_sync", "ip_check", "nics",
4412               "hvparams", "beparams"]
4413   REQ_BGL = False
4414
4415   def _ExpandNode(self, node):
4416     """Expands and checks one node name.
4417
4418     """
4419     node_full = self.cfg.ExpandNodeName(node)
4420     if node_full is None:
4421       raise errors.OpPrereqError("Unknown node %s" % node)
4422     return node_full
4423
4424   def ExpandNames(self):
4425     """ExpandNames for CreateInstance.
4426
4427     Figure out the right locks for instance creation.
4428
4429     """
4430     self.needed_locks = {}
4431
4432     # set optional parameters to none if they don't exist
4433     for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
4434       if not hasattr(self.op, attr):
4435         setattr(self.op, attr, None)
4436
4437     # cheap checks, mostly valid constants given
4438
4439     # verify creation mode
4440     if self.op.mode not in (constants.INSTANCE_CREATE,
4441                             constants.INSTANCE_IMPORT):
4442       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
4443                                  self.op.mode)
4444
4445     # disk template and mirror node verification
4446     if self.op.disk_template not in constants.DISK_TEMPLATES:
4447       raise errors.OpPrereqError("Invalid disk template name")
4448
4449     if self.op.hypervisor is None:
4450       self.op.hypervisor = self.cfg.GetHypervisorType()
4451
4452     cluster = self.cfg.GetClusterInfo()
4453     enabled_hvs = cluster.enabled_hypervisors
4454     if self.op.hypervisor not in enabled_hvs:
4455       raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
4456                                  " cluster (%s)" % (self.op.hypervisor,
4457                                   ",".join(enabled_hvs)))
4458
4459     # check hypervisor parameter syntax (locally)
4460     utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
4461     filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
4462                                   self.op.hvparams)
4463     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
4464     hv_type.CheckParameterSyntax(filled_hvp)
4465
4466     # fill and remember the beparams dict
4467     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
4468     self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
4469                                     self.op.beparams)
4470
4471     #### instance parameters check
4472
4473     # instance name verification
4474     hostname1 = utils.HostInfo(self.op.instance_name)
4475     self.op.instance_name = instance_name = hostname1.name
4476
4477     # this is just a preventive check, but someone might still add this
4478     # instance in the meantime, and creation will fail at lock-add time
4479     if instance_name in self.cfg.GetInstanceList():
4480       raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
4481                                  instance_name)
4482
4483     self.add_locks[locking.LEVEL_INSTANCE] = instance_name
4484
4485     # NIC buildup
4486     self.nics = []
4487     for idx, nic in enumerate(self.op.nics):
4488       nic_mode_req = nic.get("mode", None)
4489       nic_mode = nic_mode_req
4490       if nic_mode is None:
4491         nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
4492
4493       # in routed mode, for the first nic, the default ip is 'auto'
4494       if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
4495         default_ip_mode = constants.VALUE_AUTO
4496       else:
4497         default_ip_mode = constants.VALUE_NONE
4498
4499       # ip validity checks
4500       ip = nic.get("ip", default_ip_mode)
4501       if ip is None or ip.lower() == constants.VALUE_NONE:
4502         nic_ip = None
4503       elif ip.lower() == constants.VALUE_AUTO:
4504         nic_ip = hostname1.ip
4505       else:
4506         if not utils.IsValidIP(ip):
4507           raise errors.OpPrereqError("Given IP address '%s' doesn't look"
4508                                      " like a valid IP" % ip)
4509         nic_ip = ip
4510
4511       # TODO: check the ip for uniqueness !!
4512       if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
4513         raise errors.OpPrereqError("Routed nic mode requires an ip address")
4514
4515       # MAC address verification
4516       mac = nic.get("mac", constants.VALUE_AUTO)
4517       if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4518         if not utils.IsValidMac(mac.lower()):
4519           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
4520                                      mac)
4521       # bridge verification
4522       bridge = nic.get("bridge", None)
4523       link = nic.get("link", None)
4524       if bridge and link:
4525         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
4526       elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
4527         raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic")
4528       elif bridge:
4529         link = bridge
4530
4531       nicparams = {}
4532       if nic_mode_req:
4533         nicparams[constants.NIC_MODE] = nic_mode_req
4534       if link:
4535         nicparams[constants.NIC_LINK] = link
4536
4537       check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
4538                                       nicparams)
4539       objects.NIC.CheckParameterSyntax(check_params)
4540       self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
4541
4542     # disk checks/pre-build
4543     self.disks = []
4544     for disk in self.op.disks:
4545       mode = disk.get("mode", constants.DISK_RDWR)
4546       if mode not in constants.DISK_ACCESS_SET:
4547         raise errors.OpPrereqError("Invalid disk access mode '%s'" %
4548                                    mode)
4549       size = disk.get("size", None)
4550       if size is None:
4551         raise errors.OpPrereqError("Missing disk size")
4552       try:
4553         size = int(size)
4554       except ValueError:
4555         raise errors.OpPrereqError("Invalid disk size '%s'" % size)
4556       self.disks.append({"size": size, "mode": mode})
4557
4558     # used in CheckPrereq for ip ping check
4559     self.check_ip = hostname1.ip
4560
4561     # file storage checks
4562     if (self.op.file_driver and
4563         not self.op.file_driver in constants.FILE_DRIVER):
4564       raise errors.OpPrereqError("Invalid file driver name '%s'" %
4565                                  self.op.file_driver)
4566
4567     if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
4568       raise errors.OpPrereqError("File storage directory path not absolute")
4569
4570     ### Node/iallocator related checks
4571     if [self.op.iallocator, self.op.pnode].count(None) != 1:
4572       raise errors.OpPrereqError("One and only one of iallocator and primary"
4573                                  " node must be given")
4574
4575     if self.op.iallocator:
4576       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4577     else:
4578       self.op.pnode = self._ExpandNode(self.op.pnode)
4579       nodelist = [self.op.pnode]
4580       if self.op.snode is not None:
4581         self.op.snode = self._ExpandNode(self.op.snode)
4582         nodelist.append(self.op.snode)
4583       self.needed_locks[locking.LEVEL_NODE] = nodelist
4584
4585     # in case of import lock the source node too
4586     if self.op.mode == constants.INSTANCE_IMPORT:
4587       src_node = getattr(self.op, "src_node", None)
4588       src_path = getattr(self.op, "src_path", None)
4589
4590       if src_path is None:
4591         self.op.src_path = src_path = self.op.instance_name
4592
4593       if src_node is None:
4594         self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
4595         self.op.src_node = None
4596         if os.path.isabs(src_path):
4597           raise errors.OpPrereqError("Importing an instance from an absolute"
4598                                      " path requires a source node option.")
4599       else:
4600         self.op.src_node = src_node = self._ExpandNode(src_node)
4601         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
4602           self.needed_locks[locking.LEVEL_NODE].append(src_node)
4603         if not os.path.isabs(src_path):
4604           self.op.src_path = src_path = \
4605             os.path.join(constants.EXPORT_DIR, src_path)
4606
4607     else: # INSTANCE_CREATE
4608       if getattr(self.op, "os_type", None) is None:
4609         raise errors.OpPrereqError("No guest OS specified")
4610
4611   def _RunAllocator(self):
4612     """Run the allocator based on input opcode.
4613
4614     """
4615     nics = [n.ToDict() for n in self.nics]
4616     ial = IAllocator(self,
4617                      mode=constants.IALLOCATOR_MODE_ALLOC,
4618                      name=self.op.instance_name,
4619                      disk_template=self.op.disk_template,
4620                      tags=[],
4621                      os=self.op.os_type,
4622                      vcpus=self.be_full[constants.BE_VCPUS],
4623                      mem_size=self.be_full[constants.BE_MEMORY],
4624                      disks=self.disks,
4625                      nics=nics,
4626                      hypervisor=self.op.hypervisor,
4627                      )
4628
4629     ial.Run(self.op.iallocator)
4630
4631     if not ial.success:
4632       raise errors.OpPrereqError("Can't compute nodes using"
4633                                  " iallocator '%s': %s" % (self.op.iallocator,
4634                                                            ial.info))
4635     if len(ial.nodes) != ial.required_nodes:
4636       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
4637                                  " of nodes (%s), required %s" %
4638                                  (self.op.iallocator, len(ial.nodes),
4639                                   ial.required_nodes))
4640     self.op.pnode = ial.nodes[0]
4641     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
4642                  self.op.instance_name, self.op.iallocator,
4643                  ", ".join(ial.nodes))
4644     if ial.required_nodes == 2:
4645       self.op.snode = ial.nodes[1]
4646
4647   def BuildHooksEnv(self):
4648     """Build hooks env.
4649
4650     This runs on master, primary and secondary nodes of the instance.
4651
4652     """
4653     env = {
4654       "ADD_MODE": self.op.mode,
4655       }
4656     if self.op.mode == constants.INSTANCE_IMPORT:
4657       env["SRC_NODE"] = self.op.src_node
4658       env["SRC_PATH"] = self.op.src_path
4659       env["SRC_IMAGES"] = self.src_images
4660
4661     env.update(_BuildInstanceHookEnv(
4662       name=self.op.instance_name,
4663       primary_node=self.op.pnode,
4664       secondary_nodes=self.secondaries,
4665       status=self.op.start,
4666       os_type=self.op.os_type,
4667       memory=self.be_full[constants.BE_MEMORY],
4668       vcpus=self.be_full[constants.BE_VCPUS],
4669       nics=_PreBuildNICHooksList(self, self.nics),
4670       disk_template=self.op.disk_template,
4671       disks=[(d["size"], d["mode"]) for d in self.disks],
4672     ))
4673
4674     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
4675           self.secondaries)
4676     return env, nl, nl
4677
4678
4679   def CheckPrereq(self):
4680     """Check prerequisites.
4681
4682     """
4683     if (not self.cfg.GetVGName() and
4684         self.op.disk_template not in constants.DTS_NOT_LVM):
4685       raise errors.OpPrereqError("Cluster does not support lvm-based"
4686                                  " instances")
4687
4688     if self.op.mode == constants.INSTANCE_IMPORT:
4689       src_node = self.op.src_node
4690       src_path = self.op.src_path
4691
4692       if src_node is None:
4693         locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
4694         exp_list = self.rpc.call_export_list(locked_nodes)
4695         found = False
4696         for node in exp_list:
4697           if exp_list[node].RemoteFailMsg():
4698             continue
4699           if src_path in exp_list[node].payload:
4700             found = True
4701             self.op.src_node = src_node = node
4702             self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
4703                                                        src_path)
4704             break
4705         if not found:
4706           raise errors.OpPrereqError("No export found for relative path %s" %
4707                                       src_path)
4708
4709       _CheckNodeOnline(self, src_node)
4710       result = self.rpc.call_export_info(src_node, src_path)
4711       msg = result.RemoteFailMsg()
4712       if msg:
4713         raise errors.OpPrereqError("No export or invalid export found in"
4714                                    " dir %s: %s" % (src_path, msg))
4715
4716       export_info = objects.SerializableConfigParser.Loads(str(result.payload))
4717       if not export_info.has_section(constants.INISECT_EXP):
4718         raise errors.ProgrammerError("Corrupted export config")
4719
4720       ei_version = export_info.get(constants.INISECT_EXP, 'version')
4721       if (int(ei_version) != constants.EXPORT_VERSION):
4722         raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
4723                                    (ei_version, constants.EXPORT_VERSION))
4724
4725       # Check that the new instance doesn't have less disks than the export
4726       instance_disks = len(self.disks)
4727       export_disks = export_info.getint(constants.INISECT_INS, 'disk_count')
4728       if instance_disks < export_disks:
4729         raise errors.OpPrereqError("Not enough disks to import."
4730                                    " (instance: %d, export: %d)" %
4731                                    (instance_disks, export_disks))
4732
4733       self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
4734       disk_images = []
4735       for idx in range(export_disks):
4736         option = 'disk%d_dump' % idx
4737         if export_info.has_option(constants.INISECT_INS, option):
4738           # FIXME: are the old os-es, disk sizes, etc. useful?
4739           export_name = export_info.get(constants.INISECT_INS, option)
4740           image = os.path.join(src_path, export_name)
4741           disk_images.append(image)
4742         else:
4743           disk_images.append(False)
4744
4745       self.src_images = disk_images
4746
4747       old_name = export_info.get(constants.INISECT_INS, 'name')
4748       # FIXME: int() here could throw a ValueError on broken exports
4749       exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
4750       if self.op.instance_name == old_name:
4751         for idx, nic in enumerate(self.nics):
4752           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
4753             nic_mac_ini = 'nic%d_mac' % idx
4754             nic.mac = export_info.get(constants.INISECT_INS, nic_mac_ini)
4755
4756     # ENDIF: self.op.mode == constants.INSTANCE_IMPORT
4757     # ip ping checks (we use the same ip that was resolved in ExpandNames)
4758     if self.op.start and not self.op.ip_check:
4759       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
4760                                  " adding an instance in start mode")
4761
4762     if self.op.ip_check:
4763       if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
4764         raise errors.OpPrereqError("IP %s of instance %s already in use" %
4765                                    (self.check_ip, self.op.instance_name))
4766
4767     #### mac address generation
4768     # By generating here the mac address both the allocator and the hooks get
4769     # the real final mac address rather than the 'auto' or 'generate' value.
4770     # There is a race condition between the generation and the instance object
4771     # creation, which means that we know the mac is valid now, but we're not
4772     # sure it will be when we actually add the instance. If things go bad
4773     # adding the instance will abort because of a duplicate mac, and the
4774     # creation job will fail.
4775     for nic in self.nics:
4776       if nic.mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
4777         nic.mac = self.cfg.GenerateMAC()
4778
4779     #### allocator run
4780
4781     if self.op.iallocator is not None:
4782       self._RunAllocator()
4783
4784     #### node related checks
4785
4786     # check primary node
4787     self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
4788     assert self.pnode is not None, \
4789       "Cannot retrieve locked node %s" % self.op.pnode
4790     if pnode.offline:
4791       raise errors.OpPrereqError("Cannot use offline primary node '%s'" %
4792                                  pnode.name)
4793     if pnode.drained:
4794       raise errors.OpPrereqError("Cannot use drained primary node '%s'" %
4795                                  pnode.name)
4796
4797     self.secondaries = []
4798
4799     # mirror node verification
4800     if self.op.disk_template in constants.DTS_NET_MIRROR:
4801       if self.op.snode is None:
4802         raise errors.OpPrereqError("The networked disk templates need"
4803                                    " a mirror node")
4804       if self.op.snode == pnode.name:
4805         raise errors.OpPrereqError("The secondary node cannot be"
4806                                    " the primary node.")
4807       _CheckNodeOnline(self, self.op.snode)
4808       _CheckNodeNotDrained(self, self.op.snode)
4809       self.secondaries.append(self.op.snode)
4810
4811     nodenames = [pnode.name] + self.secondaries
4812
4813     req_size = _ComputeDiskSize(self.op.disk_template,
4814                                 self.disks)
4815
4816     # Check lv size requirements
4817     if req_size is not None:
4818       nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
4819                                          self.op.hypervisor)
4820       for node in nodenames:
4821         info = nodeinfo[node]
4822         msg = info.RemoteFailMsg()
4823         if msg:
4824           raise errors.OpPrereqError("Cannot get current information"
4825                                      " from node %s: %s" % (node, msg))
4826         info = info.payload
4827         vg_free = info.get('vg_free', None)
4828         if not isinstance(vg_free, int):
4829           raise errors.OpPrereqError("Can't compute free disk space on"
4830                                      " node %s" % node)
4831         if req_size > vg_free:
4832           raise errors.OpPrereqError("Not enough disk space on target node %s."
4833                                      " %d MB available, %d MB required" %
4834                                      (node, vg_free, req_size))
4835
4836     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
4837
4838     # os verification
4839     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
4840     result.Raise()
4841     if not isinstance(result.data, objects.OS):
4842       raise errors.OpPrereqError("OS '%s' not in supported os list for"
4843                                  " primary node"  % self.op.os_type)
4844
4845     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
4846
4847     # memory check on primary node
4848     if self.op.start:
4849       _CheckNodeFreeMemory(self, self.pnode.name,
4850                            "creating instance %s" % self.op.instance_name,
4851                            self.be_full[constants.BE_MEMORY],
4852                            self.op.hypervisor)
4853
4854   def Exec(self, feedback_fn):
4855     """Create and add the instance to the cluster.
4856
4857     """
4858     instance = self.op.instance_name
4859     pnode_name = self.pnode.name
4860
4861     ht_kind = self.op.hypervisor
4862     if ht_kind in constants.HTS_REQ_PORT:
4863       network_port = self.cfg.AllocatePort()
4864     else:
4865       network_port = None
4866
4867     ##if self.op.vnc_bind_address is None:
4868     ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
4869
4870     # this is needed because os.path.join does not accept None arguments
4871     if self.op.file_storage_dir is None:
4872       string_file_storage_dir = ""
4873     else:
4874       string_file_storage_dir = self.op.file_storage_dir
4875
4876     # build the full file storage dir path
4877     file_storage_dir = os.path.normpath(os.path.join(
4878                                         self.cfg.GetFileStorageDir(),
4879                                         string_file_storage_dir, instance))
4880
4881
4882     disks = _GenerateDiskTemplate(self,
4883                                   self.op.disk_template,
4884                                   instance, pnode_name,
4885                                   self.secondaries,
4886                                   self.disks,
4887                                   file_storage_dir,
4888                                   self.op.file_driver,
4889                                   0)
4890
4891     iobj = objects.Instance(name=instance, os=self.op.os_type,
4892                             primary_node=pnode_name,
4893                             nics=self.nics, disks=disks,
4894                             disk_template=self.op.disk_template,
4895                             admin_up=False,
4896                             network_port=network_port,
4897                             beparams=self.op.beparams,
4898                             hvparams=self.op.hvparams,
4899                             hypervisor=self.op.hypervisor,
4900                             )
4901
4902     feedback_fn("* creating instance disks...")
4903     try:
4904       _CreateDisks(self, iobj)
4905     except errors.OpExecError:
4906       self.LogWarning("Device creation failed, reverting...")
4907       try:
4908         _RemoveDisks(self, iobj)
4909       finally:
4910         self.cfg.ReleaseDRBDMinors(instance)
4911         raise
4912
4913     feedback_fn("adding instance %s to cluster config" % instance)
4914
4915     self.cfg.AddInstance(iobj)
4916     # Declare that we don't want to remove the instance lock anymore, as we've
4917     # added the instance to the config
4918     del self.remove_locks[locking.LEVEL_INSTANCE]
4919     # Unlock all the nodes
4920     if self.op.mode == constants.INSTANCE_IMPORT:
4921       nodes_keep = [self.op.src_node]
4922       nodes_release = [node for node in self.acquired_locks[locking.LEVEL_NODE]
4923                        if node != self.op.src_node]
4924       self.context.glm.release(locking.LEVEL_NODE, nodes_release)
4925       self.acquired_locks[locking.LEVEL_NODE] = nodes_keep
4926     else:
4927       self.context.glm.release(locking.LEVEL_NODE)
4928       del self.acquired_locks[locking.LEVEL_NODE]
4929
4930     if self.op.wait_for_sync:
4931       disk_abort = not _WaitForSync(self, iobj)
4932     elif iobj.disk_template in constants.DTS_NET_MIRROR:
4933       # make sure the disks are not degraded (still sync-ing is ok)
4934       time.sleep(15)
4935       feedback_fn("* checking mirrors status")
4936       disk_abort = not _WaitForSync(self, iobj, oneshot=True)
4937     else:
4938       disk_abort = False
4939
4940     if disk_abort:
4941       _RemoveDisks(self, iobj)
4942       self.cfg.RemoveInstance(iobj.name)
4943       # Make sure the instance lock gets removed
4944       self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
4945       raise errors.OpExecError("There are some degraded disks for"
4946                                " this instance")
4947
4948     feedback_fn("creating os for instance %s on node %s" %
4949                 (instance, pnode_name))
4950
4951     if iobj.disk_template != constants.DT_DISKLESS:
4952       if self.op.mode == constants.INSTANCE_CREATE:
4953         feedback_fn("* running the instance OS create scripts...")
4954         result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
4955         msg = result.RemoteFailMsg()
4956         if msg:
4957           raise errors.OpExecError("Could not add os for instance %s"
4958                                    " on node %s: %s" %
4959                                    (instance, pnode_name, msg))
4960
4961       elif self.op.mode == constants.INSTANCE_IMPORT:
4962         feedback_fn("* running the instance OS import scripts...")
4963         src_node = self.op.src_node
4964         src_images = self.src_images
4965         cluster_name = self.cfg.GetClusterName()
4966         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
4967                                                          src_node, src_images,
4968                                                          cluster_name)
4969         msg = import_result.RemoteFailMsg()
4970         if msg:
4971           self.LogWarning("Error while importing the disk images for instance"
4972                           " %s on node %s: %s" % (instance, pnode_name, msg))
4973       else:
4974         # also checked in the prereq part
4975         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
4976                                      % self.op.mode)
4977
4978     if self.op.start:
4979       iobj.admin_up = True
4980       self.cfg.Update(iobj)
4981       logging.info("Starting instance %s on node %s", instance, pnode_name)
4982       feedback_fn("* starting instance...")
4983       result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
4984       msg = result.RemoteFailMsg()
4985       if msg:
4986         raise errors.OpExecError("Could not start instance: %s" % msg)
4987
4988
4989 class LUConnectConsole(NoHooksLU):
4990   """Connect to an instance's console.
4991
4992   This is somewhat special in that it returns the command line that
4993   you need to run on the master node in order to connect to the
4994   console.
4995
4996   """
4997   _OP_REQP = ["instance_name"]
4998   REQ_BGL = False
4999
5000   def ExpandNames(self):
5001     self._ExpandAndLockInstance()
5002
5003   def CheckPrereq(self):
5004     """Check prerequisites.
5005
5006     This checks that the instance is in the cluster.
5007
5008     """
5009     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5010     assert self.instance is not None, \
5011       "Cannot retrieve locked instance %s" % self.op.instance_name
5012     _CheckNodeOnline(self, self.instance.primary_node)
5013
5014   def Exec(self, feedback_fn):
5015     """Connect to the console of an instance
5016
5017     """
5018     instance = self.instance
5019     node = instance.primary_node
5020
5021     node_insts = self.rpc.call_instance_list([node],
5022                                              [instance.hypervisor])[node]
5023     msg = node_insts.RemoteFailMsg()
5024     if msg:
5025       raise errors.OpExecError("Can't get node information from %s: %s" %
5026                                (node, msg))
5027
5028     if instance.name not in node_insts.payload:
5029       raise errors.OpExecError("Instance %s is not running." % instance.name)
5030
5031     logging.debug("Connecting to console of %s on %s", instance.name, node)
5032
5033     hyper = hypervisor.GetHypervisor(instance.hypervisor)
5034     cluster = self.cfg.GetClusterInfo()
5035     # beparams and hvparams are passed separately, to avoid editing the
5036     # instance and then saving the defaults in the instance itself.
5037     hvparams = cluster.FillHV(instance)
5038     beparams = cluster.FillBE(instance)
5039     console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
5040
5041     # build ssh cmdline
5042     return self.ssh.BuildCmd(node, "root", console_cmd, batch=True, tty=True)
5043
5044
5045 class LUReplaceDisks(LogicalUnit):
5046   """Replace the disks of an instance.
5047
5048   """
5049   HPATH = "mirrors-replace"
5050   HTYPE = constants.HTYPE_INSTANCE
5051   _OP_REQP = ["instance_name", "mode", "disks"]
5052   REQ_BGL = False
5053
5054   def CheckArguments(self):
5055     if not hasattr(self.op, "remote_node"):
5056       self.op.remote_node = None
5057     if not hasattr(self.op, "iallocator"):
5058       self.op.iallocator = None
5059
5060     # check for valid parameter combination
5061     cnt = [self.op.remote_node, self.op.iallocator].count(None)
5062     if self.op.mode == constants.REPLACE_DISK_CHG:
5063       if cnt == 2:
5064         raise errors.OpPrereqError("When changing the secondary either an"
5065                                    " iallocator script must be used or the"
5066                                    " new node given")
5067       elif cnt == 0:
5068         raise errors.OpPrereqError("Give either the iallocator or the new"
5069                                    " secondary, not both")
5070     else: # not replacing the secondary
5071       if cnt != 2:
5072         raise errors.OpPrereqError("The iallocator and new node options can"
5073                                    " be used only when changing the"
5074                                    " secondary node")
5075
5076   def ExpandNames(self):
5077     self._ExpandAndLockInstance()
5078
5079     if self.op.iallocator is not None:
5080       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
5081     elif self.op.remote_node is not None:
5082       remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
5083       if remote_node is None:
5084         raise errors.OpPrereqError("Node '%s' not known" %
5085                                    self.op.remote_node)
5086       self.op.remote_node = remote_node
5087       # Warning: do not remove the locking of the new secondary here
5088       # unless DRBD8.AddChildren is changed to work in parallel;
5089       # currently it doesn't since parallel invocations of
5090       # FindUnusedMinor will conflict
5091       self.needed_locks[locking.LEVEL_NODE] = [remote_node]
5092       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
5093     else:
5094       self.needed_locks[locking.LEVEL_NODE] = []
5095       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5096
5097   def DeclareLocks(self, level):
5098     # If we're not already locking all nodes in the set we have to declare the
5099     # instance's primary/secondary nodes.
5100     if (level == locking.LEVEL_NODE and
5101         self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
5102       self._LockInstancesNodes()
5103
5104   def _RunAllocator(self):
5105     """Compute a new secondary node using an IAllocator.
5106
5107     """
5108     ial = IAllocator(self,
5109                      mode=constants.IALLOCATOR_MODE_RELOC,
5110                      name=self.op.instance_name,
5111                      relocate_from=[self.sec_node])
5112
5113     ial.Run(self.op.iallocator)
5114
5115     if not ial.success:
5116       raise errors.OpPrereqError("Can't compute nodes using"
5117                                  " iallocator '%s': %s" % (self.op.iallocator,
5118                                                            ial.info))
5119     if len(ial.nodes) != ial.required_nodes:
5120       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
5121                                  " of nodes (%s), required %s" %
5122                                  (len(ial.nodes), ial.required_nodes))
5123     self.op.remote_node = ial.nodes[0]
5124     self.LogInfo("Selected new secondary for the instance: %s",
5125                  self.op.remote_node)
5126
5127   def BuildHooksEnv(self):
5128     """Build hooks env.
5129
5130     This runs on the master, the primary and all the secondaries.
5131
5132     """
5133     env = {
5134       "MODE": self.op.mode,
5135       "NEW_SECONDARY": self.op.remote_node,
5136       "OLD_SECONDARY": self.instance.secondary_nodes[0],
5137       }
5138     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5139     nl = [
5140       self.cfg.GetMasterNode(),
5141       self.instance.primary_node,
5142       ]
5143     if self.op.remote_node is not None:
5144       nl.append(self.op.remote_node)
5145     return env, nl, nl
5146
5147   def CheckPrereq(self):
5148     """Check prerequisites.
5149
5150     This checks that the instance is in the cluster.
5151
5152     """
5153     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5154     assert instance is not None, \
5155       "Cannot retrieve locked instance %s" % self.op.instance_name
5156     self.instance = instance
5157
5158     if instance.disk_template != constants.DT_DRBD8:
5159       raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
5160                                  " instances")
5161
5162     if len(instance.secondary_nodes) != 1:
5163       raise errors.OpPrereqError("The instance has a strange layout,"
5164                                  " expected one secondary but found %d" %
5165                                  len(instance.secondary_nodes))
5166
5167     self.sec_node = instance.secondary_nodes[0]
5168
5169     if self.op.iallocator is not None:
5170       self._RunAllocator()
5171
5172     remote_node = self.op.remote_node
5173     if remote_node is not None:
5174       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
5175       assert self.remote_node_info is not None, \
5176         "Cannot retrieve locked node %s" % remote_node
5177     else:
5178       self.remote_node_info = None
5179     if remote_node == instance.primary_node:
5180       raise errors.OpPrereqError("The specified node is the primary node of"
5181                                  " the instance.")
5182     elif remote_node == self.sec_node:
5183       raise errors.OpPrereqError("The specified node is already the"
5184                                  " secondary node of the instance.")
5185
5186     if self.op.mode == constants.REPLACE_DISK_PRI:
5187       n1 = self.tgt_node = instance.primary_node
5188       n2 = self.oth_node = self.sec_node
5189     elif self.op.mode == constants.REPLACE_DISK_SEC:
5190       n1 = self.tgt_node = self.sec_node
5191       n2 = self.oth_node = instance.primary_node
5192     elif self.op.mode == constants.REPLACE_DISK_CHG:
5193       n1 = self.new_node = remote_node
5194       n2 = self.oth_node = instance.primary_node
5195       self.tgt_node = self.sec_node
5196       _CheckNodeNotDrained(self, remote_node)
5197     else:
5198       raise errors.ProgrammerError("Unhandled disk replace mode")
5199
5200     _CheckNodeOnline(self, n1)
5201     _CheckNodeOnline(self, n2)
5202
5203     if not self.op.disks:
5204       self.op.disks = range(len(instance.disks))
5205
5206     for disk_idx in self.op.disks:
5207       instance.FindDisk(disk_idx)
5208
5209   def _ExecD8DiskOnly(self, feedback_fn):
5210     """Replace a disk on the primary or secondary for dbrd8.
5211
5212     The algorithm for replace is quite complicated:
5213
5214       1. for each disk to be replaced:
5215
5216         1. create new LVs on the target node with unique names
5217         1. detach old LVs from the drbd device
5218         1. rename old LVs to name_replaced.<time_t>
5219         1. rename new LVs to old LVs
5220         1. attach the new LVs (with the old names now) to the drbd device
5221
5222       1. wait for sync across all devices
5223
5224       1. for each modified disk:
5225
5226         1. remove old LVs (which have the name name_replaces.<time_t>)
5227
5228     Failures are not very well handled.
5229
5230     """
5231     steps_total = 6
5232     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5233     instance = self.instance
5234     iv_names = {}
5235     vgname = self.cfg.GetVGName()
5236     # start of work
5237     cfg = self.cfg
5238     tgt_node = self.tgt_node
5239     oth_node = self.oth_node
5240
5241     # Step: check device activation
5242     self.proc.LogStep(1, steps_total, "check device existence")
5243     info("checking volume groups")
5244     my_vg = cfg.GetVGName()
5245     results = self.rpc.call_vg_list([oth_node, tgt_node])
5246     if not results:
5247       raise errors.OpExecError("Can't list volume groups on the nodes")
5248     for node in oth_node, tgt_node:
5249       res = results[node]
5250       msg = res.RemoteFailMsg()
5251       if msg:
5252         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5253       if my_vg not in res.payload:
5254         raise errors.OpExecError("Volume group '%s' not found on %s" %
5255                                  (my_vg, node))
5256     for idx, dev in enumerate(instance.disks):
5257       if idx not in self.op.disks:
5258         continue
5259       for node in tgt_node, oth_node:
5260         info("checking disk/%d on %s" % (idx, node))
5261         cfg.SetDiskID(dev, node)
5262         result = self.rpc.call_blockdev_find(node, dev)
5263         msg = result.RemoteFailMsg()
5264         if not msg and not result.payload:
5265           msg = "disk not found"
5266         if msg:
5267           raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5268                                    (idx, node, msg))
5269
5270     # Step: check other node consistency
5271     self.proc.LogStep(2, steps_total, "check peer consistency")
5272     for idx, dev in enumerate(instance.disks):
5273       if idx not in self.op.disks:
5274         continue
5275       info("checking disk/%d consistency on %s" % (idx, oth_node))
5276       if not _CheckDiskConsistency(self, dev, oth_node,
5277                                    oth_node==instance.primary_node):
5278         raise errors.OpExecError("Peer node (%s) has degraded storage, unsafe"
5279                                  " to replace disks on this node (%s)" %
5280                                  (oth_node, tgt_node))
5281
5282     # Step: create new storage
5283     self.proc.LogStep(3, steps_total, "allocate new storage")
5284     for idx, dev in enumerate(instance.disks):
5285       if idx not in self.op.disks:
5286         continue
5287       size = dev.size
5288       cfg.SetDiskID(dev, tgt_node)
5289       lv_names = [".disk%d_%s" % (idx, suf)
5290                   for suf in ["data", "meta"]]
5291       names = _GenerateUniqueNames(self, lv_names)
5292       lv_data = objects.Disk(dev_type=constants.LD_LV, size=size,
5293                              logical_id=(vgname, names[0]))
5294       lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
5295                              logical_id=(vgname, names[1]))
5296       new_lvs = [lv_data, lv_meta]
5297       old_lvs = dev.children
5298       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
5299       info("creating new local storage on %s for %s" %
5300            (tgt_node, dev.iv_name))
5301       # we pass force_create=True to force the LVM creation
5302       for new_lv in new_lvs:
5303         _CreateBlockDev(self, tgt_node, instance, new_lv, True,
5304                         _GetInstanceInfoText(instance), False)
5305
5306     # Step: for each lv, detach+rename*2+attach
5307     self.proc.LogStep(4, steps_total, "change drbd configuration")
5308     for dev, old_lvs, new_lvs in iv_names.itervalues():
5309       info("detaching %s drbd from local storage" % dev.iv_name)
5310       result = self.rpc.call_blockdev_removechildren(tgt_node, dev, old_lvs)
5311       msg = result.RemoteFailMsg()
5312       if msg:
5313         raise errors.OpExecError("Can't detach drbd from local storage on node"
5314                                  " %s for device %s: %s" %
5315                                  (tgt_node, dev.iv_name, msg))
5316       #dev.children = []
5317       #cfg.Update(instance)
5318
5319       # ok, we created the new LVs, so now we know we have the needed
5320       # storage; as such, we proceed on the target node to rename
5321       # old_lv to _old, and new_lv to old_lv; note that we rename LVs
5322       # using the assumption that logical_id == physical_id (which in
5323       # turn is the unique_id on that node)
5324
5325       # FIXME(iustin): use a better name for the replaced LVs
5326       temp_suffix = int(time.time())
5327       ren_fn = lambda d, suff: (d.physical_id[0],
5328                                 d.physical_id[1] + "_replaced-%s" % suff)
5329       # build the rename list based on what LVs exist on the node
5330       rlist = []
5331       for to_ren in old_lvs:
5332         result = self.rpc.call_blockdev_find(tgt_node, to_ren)
5333         if not result.RemoteFailMsg() and result.payload:
5334           # device exists
5335           rlist.append((to_ren, ren_fn(to_ren, temp_suffix)))
5336
5337       info("renaming the old LVs on the target node")
5338       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5339       msg = result.RemoteFailMsg()
5340       if msg:
5341         raise errors.OpExecError("Can't rename old LVs on node %s: %s" %
5342                                  (tgt_node, msg))
5343       # now we rename the new LVs to the old LVs
5344       info("renaming the new LVs on the target node")
5345       rlist = [(new, old.physical_id) for old, new in zip(old_lvs, new_lvs)]
5346       result = self.rpc.call_blockdev_rename(tgt_node, rlist)
5347       msg = result.RemoteFailMsg()
5348       if msg:
5349         raise errors.OpExecError("Can't rename new LVs on node %s: %s" %
5350                                  (tgt_node, msg))
5351
5352       for old, new in zip(old_lvs, new_lvs):
5353         new.logical_id = old.logical_id
5354         cfg.SetDiskID(new, tgt_node)
5355
5356       for disk in old_lvs:
5357         disk.logical_id = ren_fn(disk, temp_suffix)
5358         cfg.SetDiskID(disk, tgt_node)
5359
5360       # now that the new lvs have the old name, we can add them to the device
5361       info("adding new mirror component on %s" % tgt_node)
5362       result = self.rpc.call_blockdev_addchildren(tgt_node, dev, new_lvs)
5363       msg = result.RemoteFailMsg()
5364       if msg:
5365         for new_lv in new_lvs:
5366           msg = self.rpc.call_blockdev_remove(tgt_node, new_lv).RemoteFailMsg()
5367           if msg:
5368             warning("Can't rollback device %s: %s", dev, msg,
5369                     hint="cleanup manually the unused logical volumes")
5370         raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
5371
5372       dev.children = new_lvs
5373       cfg.Update(instance)
5374
5375     # Step: wait for sync
5376
5377     # this can fail as the old devices are degraded and _WaitForSync
5378     # does a combined result over all disks, so we don't check its
5379     # return value
5380     self.proc.LogStep(5, steps_total, "sync devices")
5381     _WaitForSync(self, instance, unlock=True)
5382
5383     # so check manually all the devices
5384     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5385       cfg.SetDiskID(dev, instance.primary_node)
5386       result = self.rpc.call_blockdev_find(instance.primary_node, dev)
5387       msg = result.RemoteFailMsg()
5388       if not msg and not result.payload:
5389         msg = "disk not found"
5390       if msg:
5391         raise errors.OpExecError("Can't find DRBD device %s: %s" %
5392                                  (name, msg))
5393       if result.payload[5]:
5394         raise errors.OpExecError("DRBD device %s is degraded!" % name)
5395
5396     # Step: remove old storage
5397     self.proc.LogStep(6, steps_total, "removing old storage")
5398     for name, (dev, old_lvs, new_lvs) in iv_names.iteritems():
5399       info("remove logical volumes for %s" % name)
5400       for lv in old_lvs:
5401         cfg.SetDiskID(lv, tgt_node)
5402         msg = self.rpc.call_blockdev_remove(tgt_node, lv).RemoteFailMsg()
5403         if msg:
5404           warning("Can't remove old LV: %s" % msg,
5405                   hint="manually remove unused LVs")
5406           continue
5407
5408   def _ExecD8Secondary(self, feedback_fn):
5409     """Replace the secondary node for drbd8.
5410
5411     The algorithm for replace is quite complicated:
5412       - for all disks of the instance:
5413         - create new LVs on the new node with same names
5414         - shutdown the drbd device on the old secondary
5415         - disconnect the drbd network on the primary
5416         - create the drbd device on the new secondary
5417         - network attach the drbd on the primary, using an artifice:
5418           the drbd code for Attach() will connect to the network if it
5419           finds a device which is connected to the good local disks but
5420           not network enabled
5421       - wait for sync across all devices
5422       - remove all disks from the old secondary
5423
5424     Failures are not very well handled.
5425
5426     """
5427     steps_total = 6
5428     warning, info = (self.proc.LogWarning, self.proc.LogInfo)
5429     instance = self.instance
5430     iv_names = {}
5431     # start of work
5432     cfg = self.cfg
5433     old_node = self.tgt_node
5434     new_node = self.new_node
5435     pri_node = instance.primary_node
5436     nodes_ip = {
5437       old_node: self.cfg.GetNodeInfo(old_node).secondary_ip,
5438       new_node: self.cfg.GetNodeInfo(new_node).secondary_ip,
5439       pri_node: self.cfg.GetNodeInfo(pri_node).secondary_ip,
5440       }
5441
5442     # Step: check device activation
5443     self.proc.LogStep(1, steps_total, "check device existence")
5444     info("checking volume groups")
5445     my_vg = cfg.GetVGName()
5446     results = self.rpc.call_vg_list([pri_node, new_node])
5447     for node in pri_node, new_node:
5448       res = results[node]
5449       msg = res.RemoteFailMsg()
5450       if msg:
5451         raise errors.OpExecError("Error checking node %s: %s" % (node, msg))
5452       if my_vg not in res.payload:
5453         raise errors.OpExecError("Volume group '%s' not found on %s" %
5454                                  (my_vg, node))
5455     for idx, dev in enumerate(instance.disks):
5456       if idx not in self.op.disks:
5457         continue
5458       info("checking disk/%d on %s" % (idx, pri_node))
5459       cfg.SetDiskID(dev, pri_node)
5460       result = self.rpc.call_blockdev_find(pri_node, dev)
5461       msg = result.RemoteFailMsg()
5462       if not msg and not result.payload:
5463         msg = "disk not found"
5464       if msg:
5465         raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
5466                                  (idx, pri_node, msg))
5467
5468     # Step: check other node consistency
5469     self.proc.LogStep(2, steps_total, "check peer consistency")
5470     for idx, dev in enumerate(instance.disks):
5471       if idx not in self.op.disks:
5472         continue
5473       info("checking disk/%d consistency on %s" % (idx, pri_node))
5474       if not _CheckDiskConsistency(self, dev, pri_node, True, ldisk=True):
5475         raise errors.OpExecError("Primary node (%s) has degraded storage,"
5476                                  " unsafe to replace the secondary" %
5477                                  pri_node)
5478
5479     # Step: create new storage
5480     self.proc.LogStep(3, steps_total, "allocate new storage")
5481     for idx, dev in enumerate(instance.disks):
5482       info("adding new local storage on %s for disk/%d" %
5483            (new_node, idx))
5484       # we pass force_create=True to force LVM creation
5485       for new_lv in dev.children:
5486         _CreateBlockDev(self, new_node, instance, new_lv, True,
5487                         _GetInstanceInfoText(instance), False)
5488
5489     # Step 4: dbrd minors and drbd setups changes
5490     # after this, we must manually remove the drbd minors on both the
5491     # error and the success paths
5492     minors = cfg.AllocateDRBDMinor([new_node for dev in instance.disks],
5493                                    instance.name)
5494     logging.debug("Allocated minors %s" % (minors,))
5495     self.proc.LogStep(4, steps_total, "changing drbd configuration")
5496     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
5497       size = dev.size
5498       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
5499       # create new devices on new_node; note that we create two IDs:
5500       # one without port, so the drbd will be activated without
5501       # networking information on the new node at this stage, and one
5502       # with network, for the latter activation in step 4
5503       (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
5504       if pri_node == o_node1:
5505         p_minor = o_minor1
5506       else:
5507         p_minor = o_minor2
5508
5509       new_alone_id = (pri_node, new_node, None, p_minor, new_minor, o_secret)
5510       new_net_id = (pri_node, new_node, o_port, p_minor, new_minor, o_secret)
5511
5512       iv_names[idx] = (dev, dev.children, new_net_id)
5513       logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
5514                     new_net_id)
5515       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
5516                               logical_id=new_alone_id,
5517                               children=dev.children)
5518       try:
5519         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
5520                               _GetInstanceInfoText(instance), False)
5521       except errors.GenericError:
5522         self.cfg.ReleaseDRBDMinors(instance.name)
5523         raise
5524
5525     for idx, dev in enumerate(instance.disks):
5526       # we have new devices, shutdown the drbd on the old secondary
5527       info("shutting down drbd for disk/%d on old node" % idx)
5528       cfg.SetDiskID(dev, old_node)
5529       msg = self.rpc.call_blockdev_shutdown(old_node, dev).RemoteFailMsg()
5530       if msg:
5531         warning("Failed to shutdown drbd for disk/%d on old node: %s" %
5532                 (idx, msg),
5533                 hint="Please cleanup this device manually as soon as possible")
5534
5535     info("detaching primary drbds from the network (=> standalone)")
5536     result = self.rpc.call_drbd_disconnect_net([pri_node], nodes_ip,
5537                                                instance.disks)[pri_node]
5538
5539     msg = result.RemoteFailMsg()
5540     if msg:
5541       # detaches didn't succeed (unlikely)
5542       self.cfg.ReleaseDRBDMinors(instance.name)
5543       raise errors.OpExecError("Can't detach the disks from the network on"
5544                                " old node: %s" % (msg,))
5545
5546     # if we managed to detach at least one, we update all the disks of
5547     # the instance to point to the new secondary
5548     info("updating instance configuration")
5549     for dev, _, new_logical_id in iv_names.itervalues():
5550       dev.logical_id = new_logical_id
5551       cfg.SetDiskID(dev, pri_node)
5552     cfg.Update(instance)
5553
5554     # and now perform the drbd attach
5555     info("attaching primary drbds to new secondary (standalone => connected)")
5556     result = self.rpc.call_drbd_attach_net([pri_node, new_node], nodes_ip,
5557                                            instance.disks, instance.name,
5558                                            False)
5559     for to_node, to_result in result.items():
5560       msg = to_result.RemoteFailMsg()
5561       if msg:
5562         warning("can't attach drbd disks on node %s: %s", to_node, msg,
5563                 hint="please do a gnt-instance info to see the"
5564                 " status of disks")
5565
5566     # this can fail as the old devices are degraded and _WaitForSync
5567     # does a combined result over all disks, so we don't check its
5568     # return value
5569     self.proc.LogStep(5, steps_total, "sync devices")
5570     _WaitForSync(self, instance, unlock=True)
5571
5572     # so check manually all the devices
5573     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5574       cfg.SetDiskID(dev, pri_node)
5575       result = self.rpc.call_blockdev_find(pri_node, dev)
5576       msg = result.RemoteFailMsg()
5577       if not msg and not result.payload:
5578         msg = "disk not found"
5579       if msg:
5580         raise errors.OpExecError("Can't find DRBD device disk/%d: %s" %
5581                                  (idx, msg))
5582       if result.payload[5]:
5583         raise errors.OpExecError("DRBD device disk/%d is degraded!" % idx)
5584
5585     self.proc.LogStep(6, steps_total, "removing old storage")
5586     for idx, (dev, old_lvs, _) in iv_names.iteritems():
5587       info("remove logical volumes for disk/%d" % idx)
5588       for lv in old_lvs:
5589         cfg.SetDiskID(lv, old_node)
5590         msg = self.rpc.call_blockdev_remove(old_node, lv).RemoteFailMsg()
5591         if msg:
5592           warning("Can't remove LV on old secondary: %s", msg,
5593                   hint="Cleanup stale volumes by hand")
5594
5595   def Exec(self, feedback_fn):
5596     """Execute disk replacement.
5597
5598     This dispatches the disk replacement to the appropriate handler.
5599
5600     """
5601     instance = self.instance
5602
5603     # Activate the instance disks if we're replacing them on a down instance
5604     if not instance.admin_up:
5605       _StartInstanceDisks(self, instance, True)
5606
5607     if self.op.mode == constants.REPLACE_DISK_CHG:
5608       fn = self._ExecD8Secondary
5609     else:
5610       fn = self._ExecD8DiskOnly
5611
5612     ret = fn(feedback_fn)
5613
5614     # Deactivate the instance disks if we're replacing them on a down instance
5615     if not instance.admin_up:
5616       _SafeShutdownInstanceDisks(self, instance)
5617
5618     return ret
5619
5620
5621 class LUGrowDisk(LogicalUnit):
5622   """Grow a disk of an instance.
5623
5624   """
5625   HPATH = "disk-grow"
5626   HTYPE = constants.HTYPE_INSTANCE
5627   _OP_REQP = ["instance_name", "disk", "amount", "wait_for_sync"]
5628   REQ_BGL = False
5629
5630   def ExpandNames(self):
5631     self._ExpandAndLockInstance()
5632     self.needed_locks[locking.LEVEL_NODE] = []
5633     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5634
5635   def DeclareLocks(self, level):
5636     if level == locking.LEVEL_NODE:
5637       self._LockInstancesNodes()
5638
5639   def BuildHooksEnv(self):
5640     """Build hooks env.
5641
5642     This runs on the master, the primary and all the secondaries.
5643
5644     """
5645     env = {
5646       "DISK": self.op.disk,
5647       "AMOUNT": self.op.amount,
5648       }
5649     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
5650     nl = [
5651       self.cfg.GetMasterNode(),
5652       self.instance.primary_node,
5653       ]
5654     return env, nl, nl
5655
5656   def CheckPrereq(self):
5657     """Check prerequisites.
5658
5659     This checks that the instance is in the cluster.
5660
5661     """
5662     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
5663     assert instance is not None, \
5664       "Cannot retrieve locked instance %s" % self.op.instance_name
5665     nodenames = list(instance.all_nodes)
5666     for node in nodenames:
5667       _CheckNodeOnline(self, node)
5668
5669
5670     self.instance = instance
5671
5672     if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
5673       raise errors.OpPrereqError("Instance's disk layout does not support"
5674                                  " growing.")
5675
5676     self.disk = instance.FindDisk(self.op.disk)
5677
5678     nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
5679                                        instance.hypervisor)
5680     for node in nodenames:
5681       info = nodeinfo[node]
5682       msg = info.RemoteFailMsg()
5683       if msg:
5684         raise errors.OpPrereqError("Cannot get current information"
5685                                    " from node %s:" % (node, msg))
5686       vg_free = info.payload.get('vg_free', None)
5687       if not isinstance(vg_free, int):
5688         raise errors.OpPrereqError("Can't compute free disk space on"
5689                                    " node %s" % node)
5690       if self.op.amount > vg_free:
5691         raise errors.OpPrereqError("Not enough disk space on target node %s:"
5692                                    " %d MiB available, %d MiB required" %
5693                                    (node, vg_free, self.op.amount))
5694
5695   def Exec(self, feedback_fn):
5696     """Execute disk grow.
5697
5698     """
5699     instance = self.instance
5700     disk = self.disk
5701     for node in instance.all_nodes:
5702       self.cfg.SetDiskID(disk, node)
5703       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
5704       msg = result.RemoteFailMsg()
5705       if msg:
5706         raise errors.OpExecError("Grow request failed to node %s: %s" %
5707                                  (node, msg))
5708     disk.RecordGrow(self.op.amount)
5709     self.cfg.Update(instance)
5710     if self.op.wait_for_sync:
5711       disk_abort = not _WaitForSync(self, instance)
5712       if disk_abort:
5713         self.proc.LogWarning("Warning: disk sync-ing has not returned a good"
5714                              " status.\nPlease check the instance.")
5715
5716
5717 class LUQueryInstanceData(NoHooksLU):
5718   """Query runtime instance data.
5719
5720   """
5721   _OP_REQP = ["instances", "static"]
5722   REQ_BGL = False
5723
5724   def ExpandNames(self):
5725     self.needed_locks = {}
5726     self.share_locks = dict(((i, 1) for i in locking.LEVELS))
5727
5728     if not isinstance(self.op.instances, list):
5729       raise errors.OpPrereqError("Invalid argument type 'instances'")
5730
5731     if self.op.instances:
5732       self.wanted_names = []
5733       for name in self.op.instances:
5734         full_name = self.cfg.ExpandInstanceName(name)
5735         if full_name is None:
5736           raise errors.OpPrereqError("Instance '%s' not known" % name)
5737         self.wanted_names.append(full_name)
5738       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
5739     else:
5740       self.wanted_names = None
5741       self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
5742
5743     self.needed_locks[locking.LEVEL_NODE] = []
5744     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5745
5746   def DeclareLocks(self, level):
5747     if level == locking.LEVEL_NODE:
5748       self._LockInstancesNodes()
5749
5750   def CheckPrereq(self):
5751     """Check prerequisites.
5752
5753     This only checks the optional instance list against the existing names.
5754
5755     """
5756     if self.wanted_names is None:
5757       self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
5758
5759     self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
5760                              in self.wanted_names]
5761     return
5762
5763   def _ComputeDiskStatus(self, instance, snode, dev):
5764     """Compute block device status.
5765
5766     """
5767     static = self.op.static
5768     if not static:
5769       self.cfg.SetDiskID(dev, instance.primary_node)
5770       dev_pstatus = self.rpc.call_blockdev_find(instance.primary_node, dev)
5771       if dev_pstatus.offline:
5772         dev_pstatus = None
5773       else:
5774         msg = dev_pstatus.RemoteFailMsg()
5775         if msg:
5776           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5777                                    (instance.name, msg))
5778         dev_pstatus = dev_pstatus.payload
5779     else:
5780       dev_pstatus = None
5781
5782     if dev.dev_type in constants.LDS_DRBD:
5783       # we change the snode then (otherwise we use the one passed in)
5784       if dev.logical_id[0] == instance.primary_node:
5785         snode = dev.logical_id[1]
5786       else:
5787         snode = dev.logical_id[0]
5788
5789     if snode and not static:
5790       self.cfg.SetDiskID(dev, snode)
5791       dev_sstatus = self.rpc.call_blockdev_find(snode, dev)
5792       if dev_sstatus.offline:
5793         dev_sstatus = None
5794       else:
5795         msg = dev_sstatus.RemoteFailMsg()
5796         if msg:
5797           raise errors.OpExecError("Can't compute disk status for %s: %s" %
5798                                    (instance.name, msg))
5799         dev_sstatus = dev_sstatus.payload
5800     else:
5801       dev_sstatus = None
5802
5803     if dev.children:
5804       dev_children = [self._ComputeDiskStatus(instance, snode, child)
5805                       for child in dev.children]
5806     else:
5807       dev_children = []
5808
5809     data = {
5810       "iv_name": dev.iv_name,
5811       "dev_type": dev.dev_type,
5812       "logical_id": dev.logical_id,
5813       "physical_id": dev.physical_id,
5814       "pstatus": dev_pstatus,
5815       "sstatus": dev_sstatus,
5816       "children": dev_children,
5817       "mode": dev.mode,
5818       }
5819
5820     return data
5821
5822   def Exec(self, feedback_fn):
5823     """Gather and return data"""
5824     result = {}
5825
5826     cluster = self.cfg.GetClusterInfo()
5827
5828     for instance in self.wanted_instances:
5829       if not self.op.static:
5830         remote_info = self.rpc.call_instance_info(instance.primary_node,
5831                                                   instance.name,
5832                                                   instance.hypervisor)
5833         msg = remote_info.RemoteFailMsg()
5834         if msg:
5835           raise errors.OpExecError("Error checking node %s: %s" %
5836                                    (instance.primary_node, msg))
5837         remote_info = remote_info.payload
5838         if remote_info and "state" in remote_info:
5839           remote_state = "up"
5840         else:
5841           remote_state = "down"
5842       else:
5843         remote_state = None
5844       if instance.admin_up:
5845         config_state = "up"
5846       else:
5847         config_state = "down"
5848
5849       disks = [self._ComputeDiskStatus(instance, None, device)
5850                for device in instance.disks]
5851
5852       idict = {
5853         "name": instance.name,
5854         "config_state": config_state,
5855         "run_state": remote_state,
5856         "pnode": instance.primary_node,
5857         "snodes": instance.secondary_nodes,
5858         "os": instance.os,
5859         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
5860         "disks": disks,
5861         "hypervisor": instance.hypervisor,
5862         "network_port": instance.network_port,
5863         "hv_instance": instance.hvparams,
5864         "hv_actual": cluster.FillHV(instance),
5865         "be_instance": instance.beparams,
5866         "be_actual": cluster.FillBE(instance),
5867         }
5868
5869       result[instance.name] = idict
5870
5871     return result
5872
5873
5874 class LUSetInstanceParams(LogicalUnit):
5875   """Modifies an instances's parameters.
5876
5877   """
5878   HPATH = "instance-modify"
5879   HTYPE = constants.HTYPE_INSTANCE
5880   _OP_REQP = ["instance_name"]
5881   REQ_BGL = False
5882
5883   def CheckArguments(self):
5884     if not hasattr(self.op, 'nics'):
5885       self.op.nics = []
5886     if not hasattr(self.op, 'disks'):
5887       self.op.disks = []
5888     if not hasattr(self.op, 'beparams'):
5889       self.op.beparams = {}
5890     if not hasattr(self.op, 'hvparams'):
5891       self.op.hvparams = {}
5892     self.op.force = getattr(self.op, "force", False)
5893     if not (self.op.nics or self.op.disks or
5894             self.op.hvparams or self.op.beparams):
5895       raise errors.OpPrereqError("No changes submitted")
5896
5897     # Disk validation
5898     disk_addremove = 0
5899     for disk_op, disk_dict in self.op.disks:
5900       if disk_op == constants.DDM_REMOVE:
5901         disk_addremove += 1
5902         continue
5903       elif disk_op == constants.DDM_ADD:
5904         disk_addremove += 1
5905       else:
5906         if not isinstance(disk_op, int):
5907           raise errors.OpPrereqError("Invalid disk index")
5908       if disk_op == constants.DDM_ADD:
5909         mode = disk_dict.setdefault('mode', constants.DISK_RDWR)
5910         if mode not in constants.DISK_ACCESS_SET:
5911           raise errors.OpPrereqError("Invalid disk access mode '%s'" % mode)
5912         size = disk_dict.get('size', None)
5913         if size is None:
5914           raise errors.OpPrereqError("Required disk parameter size missing")
5915         try:
5916           size = int(size)
5917         except ValueError, err:
5918           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
5919                                      str(err))
5920         disk_dict['size'] = size
5921       else:
5922         # modification of disk
5923         if 'size' in disk_dict:
5924           raise errors.OpPrereqError("Disk size change not possible, use"
5925                                      " grow-disk")
5926
5927     if disk_addremove > 1:
5928       raise errors.OpPrereqError("Only one disk add or remove operation"
5929                                  " supported at a time")
5930
5931     # NIC validation
5932     nic_addremove = 0
5933     for nic_op, nic_dict in self.op.nics:
5934       if nic_op == constants.DDM_REMOVE:
5935         nic_addremove += 1
5936         continue
5937       elif nic_op == constants.DDM_ADD:
5938         nic_addremove += 1
5939       else:
5940         if not isinstance(nic_op, int):
5941           raise errors.OpPrereqError("Invalid nic index")
5942
5943       # nic_dict should be a dict
5944       nic_ip = nic_dict.get('ip', None)
5945       if nic_ip is not None:
5946         if nic_ip.lower() == constants.VALUE_NONE:
5947           nic_dict['ip'] = None
5948         else:
5949           if not utils.IsValidIP(nic_ip):
5950             raise errors.OpPrereqError("Invalid IP address '%s'" % nic_ip)
5951
5952       nic_bridge = nic_dict.get('bridge', None)
5953       nic_link = nic_dict.get('link', None)
5954       if nic_bridge and nic_link:
5955         raise errors.OpPrereqError("Cannot pass 'bridge' and 'link' at the same time")
5956       elif nic_bridge and nic_bridge.lower() == constants.VALUE_NONE:
5957         nic_dict['bridge'] = None
5958       elif nic_link and nic_link.lower() == constants.VALUE_NONE:
5959         nic_dict['link'] = None
5960
5961       if nic_op == constants.DDM_ADD:
5962         nic_mac = nic_dict.get('mac', None)
5963         if nic_mac is None:
5964           nic_dict['mac'] = constants.VALUE_AUTO
5965
5966       if 'mac' in nic_dict:
5967         nic_mac = nic_dict['mac']
5968         if nic_mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
5969           if not utils.IsValidMac(nic_mac):
5970             raise errors.OpPrereqError("Invalid MAC address %s" % nic_mac)
5971         if nic_op != constants.DDM_ADD and nic_mac == constants.VALUE_AUTO:
5972           raise errors.OpPrereqError("'auto' is not a valid MAC address when"
5973                                      " modifying an existing nic")
5974
5975     if nic_addremove > 1:
5976       raise errors.OpPrereqError("Only one NIC add or remove operation"
5977                                  " supported at a time")
5978
5979   def ExpandNames(self):
5980     self._ExpandAndLockInstance()
5981     self.needed_locks[locking.LEVEL_NODE] = []
5982     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
5983
5984   def DeclareLocks(self, level):
5985     if level == locking.LEVEL_NODE:
5986       self._LockInstancesNodes()
5987
5988   def BuildHooksEnv(self):
5989     """Build hooks env.
5990
5991     This runs on the master, primary and secondaries.
5992
5993     """
5994     args = dict()
5995     if constants.BE_MEMORY in self.be_new:
5996       args['memory'] = self.be_new[constants.BE_MEMORY]
5997     if constants.BE_VCPUS in self.be_new:
5998       args['vcpus'] = self.be_new[constants.BE_VCPUS]
5999     # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk
6000     # information at all.
6001     if self.op.nics:
6002       args['nics'] = []
6003       nic_override = dict(self.op.nics)
6004       c_nicparams = self.cluster.nicparams[constants.PP_DEFAULT]
6005       for idx, nic in enumerate(self.instance.nics):
6006         if idx in nic_override:
6007           this_nic_override = nic_override[idx]
6008         else:
6009           this_nic_override = {}
6010         if 'ip' in this_nic_override:
6011           ip = this_nic_override['ip']
6012         else:
6013           ip = nic.ip
6014         if 'mac' in this_nic_override:
6015           mac = this_nic_override['mac']
6016         else:
6017           mac = nic.mac
6018         if idx in self.nic_pnew:
6019           nicparams = self.nic_pnew[idx]
6020         else:
6021           nicparams = objects.FillDict(c_nicparams, nic.nicparams)
6022         mode = nicparams[constants.NIC_MODE]
6023         link = nicparams[constants.NIC_LINK]
6024         args['nics'].append((ip, mac, mode, link))
6025       if constants.DDM_ADD in nic_override:
6026         ip = nic_override[constants.DDM_ADD].get('ip', None)
6027         mac = nic_override[constants.DDM_ADD]['mac']
6028         nicparams = self.nic_pnew[constants.DDM_ADD]
6029         mode = nicparams[constants.NIC_MODE]
6030         link = nicparams[constants.NIC_LINK]
6031         args['nics'].append((ip, mac, mode, link))
6032       elif constants.DDM_REMOVE in nic_override:
6033         del args['nics'][-1]
6034
6035     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
6036     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
6037     return env, nl, nl
6038
6039   def _GetUpdatedParams(self, old_params, update_dict,
6040                         default_values, parameter_types):
6041     """Return the new params dict for the given params.
6042
6043     @type old_params: dict
6044     @type old_params: old parameters
6045     @type update_dict: dict
6046     @type update_dict: dict containing new parameter values,
6047                        or constants.VALUE_DEFAULT to reset the
6048                        parameter to its default value
6049     @type default_values: dict
6050     @param default_values: default values for the filled parameters
6051     @type parameter_types: dict
6052     @param parameter_types: dict mapping target dict keys to types
6053                             in constants.ENFORCEABLE_TYPES
6054     @rtype: (dict, dict)
6055     @return: (new_parameters, filled_parameters)
6056
6057     """
6058     params_copy = copy.deepcopy(old_params)
6059     for key, val in update_dict.iteritems():
6060       if val == constants.VALUE_DEFAULT:
6061         try:
6062           del params_copy[key]
6063         except KeyError:
6064           pass
6065       else:
6066         params_copy[key] = val
6067     utils.ForceDictType(params_copy, parameter_types)
6068     params_filled = objects.FillDict(default_values, params_copy)
6069     return (params_copy, params_filled)
6070
6071   def CheckPrereq(self):
6072     """Check prerequisites.
6073
6074     This only checks the instance list against the existing names.
6075
6076     """
6077     force = self.force = self.op.force
6078
6079     # checking the new params on the primary/secondary nodes
6080
6081     instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
6082     cluster = self.cluster = self.cfg.GetClusterInfo()
6083     assert self.instance is not None, \
6084       "Cannot retrieve locked instance %s" % self.op.instance_name
6085     pnode = instance.primary_node
6086     nodelist = list(instance.all_nodes)
6087
6088     # hvparams processing
6089     if self.op.hvparams:
6090       i_hvdict, hv_new = self._GetUpdatedParams(
6091                              instance.hvparams, self.op.hvparams,
6092                              cluster.hvparams[instance.hypervisor],
6093                              constants.HVS_PARAMETER_TYPES)
6094       # local check
6095       hypervisor.GetHypervisor(
6096         instance.hypervisor).CheckParameterSyntax(hv_new)
6097       _CheckHVParams(self, nodelist, instance.hypervisor, hv_new)
6098       self.hv_new = hv_new # the new actual values
6099       self.hv_inst = i_hvdict # the new dict (without defaults)
6100     else:
6101       self.hv_new = self.hv_inst = {}
6102
6103     # beparams processing
6104     if self.op.beparams:
6105       i_bedict, be_new = self._GetUpdatedParams(
6106                              instance.beparams, self.op.beparams,
6107                              cluster.beparams[constants.PP_DEFAULT],
6108                              constants.BES_PARAMETER_TYPES)
6109       self.be_new = be_new # the new actual values
6110       self.be_inst = i_bedict # the new dict (without defaults)
6111     else:
6112       self.be_new = self.be_inst = {}
6113
6114     self.warn = []
6115
6116     if constants.BE_MEMORY in self.op.beparams and not self.force:
6117       mem_check_list = [pnode]
6118       if be_new[constants.BE_AUTO_BALANCE]:
6119         # either we changed auto_balance to yes or it was from before
6120         mem_check_list.extend(instance.secondary_nodes)
6121       instance_info = self.rpc.call_instance_info(pnode, instance.name,
6122                                                   instance.hypervisor)
6123       nodeinfo = self.rpc.call_node_info(mem_check_list, self.cfg.GetVGName(),
6124                                          instance.hypervisor)
6125       pninfo = nodeinfo[pnode]
6126       msg = pninfo.RemoteFailMsg()
6127       if msg:
6128         # Assume the primary node is unreachable and go ahead
6129         self.warn.append("Can't get info from primary node %s: %s" %
6130                          (pnode,  msg))
6131       elif not isinstance(pninfo.payload.get('memory_free', None), int):
6132         self.warn.append("Node data from primary node %s doesn't contain"
6133                          " free memory information" % pnode)
6134       elif instance_info.RemoteFailMsg():
6135         self.warn.append("Can't get instance runtime information: %s" %
6136                         instance_info.RemoteFailMsg())
6137       else:
6138         if instance_info.payload:
6139           current_mem = int(instance_info.payload['memory'])
6140         else:
6141           # Assume instance not running
6142           # (there is a slight race condition here, but it's not very probable,
6143           # and we have no other way to check)
6144           current_mem = 0
6145         miss_mem = (be_new[constants.BE_MEMORY] - current_mem -
6146                     pninfo.payload['memory_free'])
6147         if miss_mem > 0:
6148           raise errors.OpPrereqError("This change will prevent the instance"
6149                                      " from starting, due to %d MB of memory"
6150                                      " missing on its primary node" % miss_mem)
6151
6152       if be_new[constants.BE_AUTO_BALANCE]:
6153         for node, nres in nodeinfo.items():
6154           if node not in instance.secondary_nodes:
6155             continue
6156           msg = nres.RemoteFailMsg()
6157           if msg:
6158             self.warn.append("Can't get info from secondary node %s: %s" %
6159                              (node, msg))
6160           elif not isinstance(nres.payload.get('memory_free', None), int):
6161             self.warn.append("Secondary node %s didn't return free"
6162                              " memory information" % node)
6163           elif be_new[constants.BE_MEMORY] > nres.payload['memory_free']:
6164             self.warn.append("Not enough memory to failover instance to"
6165                              " secondary node %s" % node)
6166
6167     # NIC processing
6168     self.nic_pnew = {}
6169     self.nic_pinst = {}
6170     for nic_op, nic_dict in self.op.nics:
6171       if nic_op == constants.DDM_REMOVE:
6172         if not instance.nics:
6173           raise errors.OpPrereqError("Instance has no NICs, cannot remove")
6174         continue
6175       if nic_op != constants.DDM_ADD:
6176         # an existing nic
6177         if nic_op < 0 or nic_op >= len(instance.nics):
6178           raise errors.OpPrereqError("Invalid NIC index %s, valid values"
6179                                      " are 0 to %d" %
6180                                      (nic_op, len(instance.nics)))
6181         old_nic_params = instance.nics[nic_op].nicparams
6182         old_nic_ip = instance.nics[nic_op].ip
6183       else:
6184         old_nic_params = {}
6185         old_nic_ip = None
6186
6187       update_params_dict = dict([(key, nic_dict[key])
6188                                  for key in constants.NICS_PARAMETERS
6189                                  if key in nic_dict])
6190
6191       if 'bridge' in nic_dict:
6192         update_params_dict[constants.NIC_LINK] = nic_dict['bridge']
6193
6194       new_nic_params, new_filled_nic_params = \
6195           self._GetUpdatedParams(old_nic_params, update_params_dict,
6196                                  cluster.nicparams[constants.PP_DEFAULT],
6197                                  constants.NICS_PARAMETER_TYPES)
6198       objects.NIC.CheckParameterSyntax(new_filled_nic_params)
6199       self.nic_pinst[nic_op] = new_nic_params
6200       self.nic_pnew[nic_op] = new_filled_nic_params
6201       new_nic_mode = new_filled_nic_params[constants.NIC_MODE]
6202
6203       if new_nic_mode == constants.NIC_MODE_BRIDGED:
6204         nic_bridge = new_filled_nic_params[constants.NIC_LINK]
6205         result = self.rpc.call_bridges_exist(pnode, [nic_bridge])
6206         msg = result.RemoteFailMsg()
6207         if msg:
6208           msg = "Error checking bridges on node %s: %s" % (pnode, msg)
6209           if self.force:
6210             self.warn.append(msg)
6211           else:
6212             raise errors.OpPrereqError(msg)
6213       if new_nic_mode == constants.NIC_MODE_ROUTED:
6214         if 'ip' in nic_dict:
6215           nic_ip = nic_dict['ip']
6216         else:
6217           nic_ip = old_nic_ip
6218         if nic_ip is None:
6219           raise errors.OpPrereqError('Cannot set the nic ip to None'
6220                                      ' on a routed nic')
6221       if 'mac' in nic_dict:
6222         nic_mac = nic_dict['mac']
6223         if nic_mac is None:
6224           raise errors.OpPrereqError('Cannot set the nic mac to None')
6225         elif nic_mac in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
6226           # otherwise generate the mac
6227           nic_dict['mac'] = self.cfg.GenerateMAC()
6228         else:
6229           # or validate/reserve the current one
6230           if self.cfg.IsMacInUse(nic_mac):
6231             raise errors.OpPrereqError("MAC address %s already in use"
6232                                        " in cluster" % nic_mac)
6233
6234     # DISK processing
6235     if self.op.disks and instance.disk_template == constants.DT_DISKLESS:
6236       raise errors.OpPrereqError("Disk operations not supported for"
6237                                  " diskless instances")
6238     for disk_op, disk_dict in self.op.disks:
6239       if disk_op == constants.DDM_REMOVE:
6240         if len(instance.disks) == 1:
6241           raise errors.OpPrereqError("Cannot remove the last disk of"
6242                                      " an instance")
6243         ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
6244         ins_l = ins_l[pnode]
6245         msg = ins_l.RemoteFailMsg()
6246         if msg:
6247           raise errors.OpPrereqError("Can't contact node %s: %s" %
6248                                      (pnode, msg))
6249         if instance.name in ins_l.payload:
6250           raise errors.OpPrereqError("Instance is running, can't remove"
6251                                      " disks.")
6252
6253       if (disk_op == constants.DDM_ADD and
6254           len(instance.nics) >= constants.MAX_DISKS):
6255         raise errors.OpPrereqError("Instance has too many disks (%d), cannot"
6256                                    " add more" % constants.MAX_DISKS)
6257       if disk_op not in (constants.DDM_ADD, constants.DDM_REMOVE):
6258         # an existing disk
6259         if disk_op < 0 or disk_op >= len(instance.disks):
6260           raise errors.OpPrereqError("Invalid disk index %s, valid values"
6261                                      " are 0 to %d" %
6262                                      (disk_op, len(instance.disks)))
6263
6264     return
6265
6266   def Exec(self, feedback_fn):
6267     """Modifies an instance.
6268
6269     All parameters take effect only at the next restart of the instance.
6270
6271     """
6272     # Process here the warnings from CheckPrereq, as we don't have a
6273     # feedback_fn there.
6274     for warn in self.warn:
6275       feedback_fn("WARNING: %s" % warn)
6276
6277     result = []
6278     instance = self.instance
6279     cluster = self.cluster
6280     # disk changes
6281     for disk_op, disk_dict in self.op.disks:
6282       if disk_op == constants.DDM_REMOVE:
6283         # remove the last disk
6284         device = instance.disks.pop()
6285         device_idx = len(instance.disks)
6286         for node, disk in device.ComputeNodeTree(instance.primary_node):
6287           self.cfg.SetDiskID(disk, node)
6288           msg = self.rpc.call_blockdev_remove(node, disk).RemoteFailMsg()
6289           if msg:
6290             self.LogWarning("Could not remove disk/%d on node %s: %s,"
6291                             " continuing anyway", device_idx, node, msg)
6292         result.append(("disk/%d" % device_idx, "remove"))
6293       elif disk_op == constants.DDM_ADD:
6294         # add a new disk
6295         if instance.disk_template == constants.DT_FILE:
6296           file_driver, file_path = instance.disks[0].logical_id
6297           file_path = os.path.dirname(file_path)
6298         else:
6299           file_driver = file_path = None
6300         disk_idx_base = len(instance.disks)
6301         new_disk = _GenerateDiskTemplate(self,
6302                                          instance.disk_template,
6303                                          instance.name, instance.primary_node,
6304                                          instance.secondary_nodes,
6305                                          [disk_dict],
6306                                          file_path,
6307                                          file_driver,
6308                                          disk_idx_base)[0]
6309         instance.disks.append(new_disk)
6310         info = _GetInstanceInfoText(instance)
6311
6312         logging.info("Creating volume %s for instance %s",
6313                      new_disk.iv_name, instance.name)
6314         # Note: this needs to be kept in sync with _CreateDisks
6315         #HARDCODE
6316         for node in instance.all_nodes:
6317           f_create = node == instance.primary_node
6318           try:
6319             _CreateBlockDev(self, node, instance, new_disk,
6320                             f_create, info, f_create)
6321           except errors.OpExecError, err:
6322             self.LogWarning("Failed to create volume %s (%s) on"
6323                             " node %s: %s",
6324                             new_disk.iv_name, new_disk, node, err)
6325         result.append(("disk/%d" % disk_idx_base, "add:size=%s,mode=%s" %
6326                        (new_disk.size, new_disk.mode)))
6327       else:
6328         # change a given disk
6329         instance.disks[disk_op].mode = disk_dict['mode']
6330         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
6331     # NIC changes
6332     for nic_op, nic_dict in self.op.nics:
6333       if nic_op == constants.DDM_REMOVE:
6334         # remove the last nic
6335         del instance.nics[-1]
6336         result.append(("nic.%d" % len(instance.nics), "remove"))
6337       elif nic_op == constants.DDM_ADD:
6338         # mac and bridge should be set, by now
6339         mac = nic_dict['mac']
6340         ip = nic_dict.get('ip', None)
6341         nicparams = self.nic_pinst[constants.DDM_ADD]
6342         new_nic = objects.NIC(mac=mac, ip=ip, nicparams=nicparams)
6343         instance.nics.append(new_nic)
6344         result.append(("nic.%d" % (len(instance.nics) - 1),
6345                        "add:mac=%s,ip=%s,mode=%s,link=%s" %
6346                        (new_nic.mac, new_nic.ip,
6347                         self.nic_pnew[constants.DDM_ADD][constants.NIC_MODE],
6348                         self.nic_pnew[constants.DDM_ADD][constants.NIC_LINK]
6349                        )))
6350       else:
6351         for key in 'mac', 'ip':
6352           if key in nic_dict:
6353             setattr(instance.nics[nic_op], key, nic_dict[key])
6354         if nic_op in self.nic_pnew:
6355           instance.nics[nic_op].nicparams = self.nic_pnew[nic_op]
6356         for key, val in nic_dict.iteritems():
6357           result.append(("nic.%s/%d" % (key, nic_op), val))
6358
6359     # hvparams changes
6360     if self.op.hvparams:
6361       instance.hvparams = self.hv_inst
6362       for key, val in self.op.hvparams.iteritems():
6363         result.append(("hv/%s" % key, val))
6364
6365     # beparams changes
6366     if self.op.beparams:
6367       instance.beparams = self.be_inst
6368       for key, val in self.op.beparams.iteritems():
6369         result.append(("be/%s" % key, val))
6370
6371     self.cfg.Update(instance)
6372
6373     return result
6374
6375
6376 class LUQueryExports(NoHooksLU):
6377   """Query the exports list
6378
6379   """
6380   _OP_REQP = ['nodes']
6381   REQ_BGL = False
6382
6383   def ExpandNames(self):
6384     self.needed_locks = {}
6385     self.share_locks[locking.LEVEL_NODE] = 1
6386     if not self.op.nodes:
6387       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6388     else:
6389       self.needed_locks[locking.LEVEL_NODE] = \
6390         _GetWantedNodes(self, self.op.nodes)
6391
6392   def CheckPrereq(self):
6393     """Check prerequisites.
6394
6395     """
6396     self.nodes = self.acquired_locks[locking.LEVEL_NODE]
6397
6398   def Exec(self, feedback_fn):
6399     """Compute the list of all the exported system images.
6400
6401     @rtype: dict
6402     @return: a dictionary with the structure node->(export-list)
6403         where export-list is a list of the instances exported on
6404         that node.
6405
6406     """
6407     rpcresult = self.rpc.call_export_list(self.nodes)
6408     result = {}
6409     for node in rpcresult:
6410       if rpcresult[node].RemoteFailMsg():
6411         result[node] = False
6412       else:
6413         result[node] = rpcresult[node].payload
6414
6415     return result
6416
6417
6418 class LUExportInstance(LogicalUnit):
6419   """Export an instance to an image in the cluster.
6420
6421   """
6422   HPATH = "instance-export"
6423   HTYPE = constants.HTYPE_INSTANCE
6424   _OP_REQP = ["instance_name", "target_node", "shutdown"]
6425   REQ_BGL = False
6426
6427   def ExpandNames(self):
6428     self._ExpandAndLockInstance()
6429     # FIXME: lock only instance primary and destination node
6430     #
6431     # Sad but true, for now we have do lock all nodes, as we don't know where
6432     # the previous export might be, and and in this LU we search for it and
6433     # remove it from its current node. In the future we could fix this by:
6434     #  - making a tasklet to search (share-lock all), then create the new one,
6435     #    then one to remove, after
6436     #  - removing the removal operation altoghether
6437     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6438
6439   def DeclareLocks(self, level):
6440     """Last minute lock declaration."""
6441     # All nodes are locked anyway, so nothing to do here.
6442
6443   def BuildHooksEnv(self):
6444     """Build hooks env.
6445
6446     This will run on the master, primary node and target node.
6447
6448     """
6449     env = {
6450       "EXPORT_NODE": self.op.target_node,
6451       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
6452       }
6453     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
6454     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
6455           self.op.target_node]
6456     return env, nl, nl
6457
6458   def CheckPrereq(self):
6459     """Check prerequisites.
6460
6461     This checks that the instance and node names are valid.
6462
6463     """
6464     instance_name = self.op.instance_name
6465     self.instance = self.cfg.GetInstanceInfo(instance_name)
6466     assert self.instance is not None, \
6467           "Cannot retrieve locked instance %s" % self.op.instance_name
6468     _CheckNodeOnline(self, self.instance.primary_node)
6469
6470     self.dst_node = self.cfg.GetNodeInfo(
6471       self.cfg.ExpandNodeName(self.op.target_node))
6472
6473     if self.dst_node is None:
6474       # This is wrong node name, not a non-locked node
6475       raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node)
6476     _CheckNodeOnline(self, self.dst_node.name)
6477     _CheckNodeNotDrained(self, self.dst_node.name)
6478
6479     # instance disk type verification
6480     for disk in self.instance.disks:
6481       if disk.dev_type == constants.LD_FILE:
6482         raise errors.OpPrereqError("Export not supported for instances with"
6483                                    " file-based disks")
6484
6485   def Exec(self, feedback_fn):
6486     """Export an instance to an image in the cluster.
6487
6488     """
6489     instance = self.instance
6490     dst_node = self.dst_node
6491     src_node = instance.primary_node
6492     if self.op.shutdown:
6493       # shutdown the instance, but not the disks
6494       result = self.rpc.call_instance_shutdown(src_node, instance)
6495       msg = result.RemoteFailMsg()
6496       if msg:
6497         raise errors.OpExecError("Could not shutdown instance %s on"
6498                                  " node %s: %s" %
6499                                  (instance.name, src_node, msg))
6500
6501     vgname = self.cfg.GetVGName()
6502
6503     snap_disks = []
6504
6505     # set the disks ID correctly since call_instance_start needs the
6506     # correct drbd minor to create the symlinks
6507     for disk in instance.disks:
6508       self.cfg.SetDiskID(disk, src_node)
6509
6510     try:
6511       for disk in instance.disks:
6512         # result.payload will be a snapshot of an lvm leaf of the one we passed
6513         result = self.rpc.call_blockdev_snapshot(src_node, disk)
6514         msg = result.RemoteFailMsg()
6515         if msg:
6516           self.LogWarning("Could not snapshot block device %s on node %s: %s",
6517                           disk.logical_id[1], src_node, msg)
6518           snap_disks.append(False)
6519         else:
6520           disk_id = (vgname, result.payload)
6521           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
6522                                  logical_id=disk_id, physical_id=disk_id,
6523                                  iv_name=disk.iv_name)
6524           snap_disks.append(new_dev)
6525
6526     finally:
6527       if self.op.shutdown and instance.admin_up:
6528         result = self.rpc.call_instance_start(src_node, instance, None, None)
6529         msg = result.RemoteFailMsg()
6530         if msg:
6531           _ShutdownInstanceDisks(self, instance)
6532           raise errors.OpExecError("Could not start instance: %s" % msg)
6533
6534     # TODO: check for size
6535
6536     cluster_name = self.cfg.GetClusterName()
6537     for idx, dev in enumerate(snap_disks):
6538       if dev:
6539         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
6540                                                instance, cluster_name, idx)
6541         msg = result.RemoteFailMsg()
6542         if msg:
6543           self.LogWarning("Could not export block device %s from node %s to"
6544                           " node %s: %s", dev.logical_id[1], src_node,
6545                           dst_node.name, msg)
6546         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
6547         if msg:
6548           self.LogWarning("Could not remove snapshot block device %s from node"
6549                           " %s: %s", dev.logical_id[1], src_node, msg)
6550
6551     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
6552     msg = result.RemoteFailMsg()
6553     if msg:
6554       self.LogWarning("Could not finalize export for instance %s"
6555                       " on node %s: %s", instance.name, dst_node.name, msg)
6556
6557     nodelist = self.cfg.GetNodeList()
6558     nodelist.remove(dst_node.name)
6559
6560     # on one-node clusters nodelist will be empty after the removal
6561     # if we proceed the backup would be removed because OpQueryExports
6562     # substitutes an empty list with the full cluster node list.
6563     iname = instance.name
6564     if nodelist:
6565       exportlist = self.rpc.call_export_list(nodelist)
6566       for node in exportlist:
6567         if exportlist[node].RemoteFailMsg():
6568           continue
6569         if iname in exportlist[node].payload:
6570           msg = self.rpc.call_export_remove(node, iname).RemoteFailMsg()
6571           if msg:
6572             self.LogWarning("Could not remove older export for instance %s"
6573                             " on node %s: %s", iname, node, msg)
6574
6575
6576 class LURemoveExport(NoHooksLU):
6577   """Remove exports related to the named instance.
6578
6579   """
6580   _OP_REQP = ["instance_name"]
6581   REQ_BGL = False
6582
6583   def ExpandNames(self):
6584     self.needed_locks = {}
6585     # We need all nodes to be locked in order for RemoveExport to work, but we
6586     # don't need to lock the instance itself, as nothing will happen to it (and
6587     # we can remove exports also for a removed instance)
6588     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
6589
6590   def CheckPrereq(self):
6591     """Check prerequisites.
6592     """
6593     pass
6594
6595   def Exec(self, feedback_fn):
6596     """Remove any export.
6597
6598     """
6599     instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
6600     # If the instance was not found we'll try with the name that was passed in.
6601     # This will only work if it was an FQDN, though.
6602     fqdn_warn = False
6603     if not instance_name:
6604       fqdn_warn = True
6605       instance_name = self.op.instance_name
6606
6607     locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
6608     exportlist = self.rpc.call_export_list(locked_nodes)
6609     found = False
6610     for node in exportlist:
6611       msg = exportlist[node].RemoteFailMsg()
6612       if msg:
6613         self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
6614         continue
6615       if instance_name in exportlist[node].payload:
6616         found = True
6617         result = self.rpc.call_export_remove(node, instance_name)
6618         msg = result.RemoteFailMsg()
6619         if msg:
6620           logging.error("Could not remove export for instance %s"
6621                         " on node %s: %s", instance_name, node, msg)
6622
6623     if fqdn_warn and not found:
6624       feedback_fn("Export not found. If trying to remove an export belonging"
6625                   " to a deleted instance please use its Fully Qualified"
6626                   " Domain Name.")
6627
6628
6629 class TagsLU(NoHooksLU):
6630   """Generic tags LU.
6631
6632   This is an abstract class which is the parent of all the other tags LUs.
6633
6634   """
6635
6636   def ExpandNames(self):
6637     self.needed_locks = {}
6638     if self.op.kind == constants.TAG_NODE:
6639       name = self.cfg.ExpandNodeName(self.op.name)
6640       if name is None:
6641         raise errors.OpPrereqError("Invalid node name (%s)" %
6642                                    (self.op.name,))
6643       self.op.name = name
6644       self.needed_locks[locking.LEVEL_NODE] = name
6645     elif self.op.kind == constants.TAG_INSTANCE:
6646       name = self.cfg.ExpandInstanceName(self.op.name)
6647       if name is None:
6648         raise errors.OpPrereqError("Invalid instance name (%s)" %
6649                                    (self.op.name,))
6650       self.op.name = name
6651       self.needed_locks[locking.LEVEL_INSTANCE] = name
6652
6653   def CheckPrereq(self):
6654     """Check prerequisites.
6655
6656     """
6657     if self.op.kind == constants.TAG_CLUSTER:
6658       self.target = self.cfg.GetClusterInfo()
6659     elif self.op.kind == constants.TAG_NODE:
6660       self.target = self.cfg.GetNodeInfo(self.op.name)
6661     elif self.op.kind == constants.TAG_INSTANCE:
6662       self.target = self.cfg.GetInstanceInfo(self.op.name)
6663     else:
6664       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
6665                                  str(self.op.kind))
6666
6667
6668 class LUGetTags(TagsLU):
6669   """Returns the tags of a given object.
6670
6671   """
6672   _OP_REQP = ["kind", "name"]
6673   REQ_BGL = False
6674
6675   def Exec(self, feedback_fn):
6676     """Returns the tag list.
6677
6678     """
6679     return list(self.target.GetTags())
6680
6681
6682 class LUSearchTags(NoHooksLU):
6683   """Searches the tags for a given pattern.
6684
6685   """
6686   _OP_REQP = ["pattern"]
6687   REQ_BGL = False
6688
6689   def ExpandNames(self):
6690     self.needed_locks = {}
6691
6692   def CheckPrereq(self):
6693     """Check prerequisites.
6694
6695     This checks the pattern passed for validity by compiling it.
6696
6697     """
6698     try:
6699       self.re = re.compile(self.op.pattern)
6700     except re.error, err:
6701       raise errors.OpPrereqError("Invalid search pattern '%s': %s" %
6702                                  (self.op.pattern, err))
6703
6704   def Exec(self, feedback_fn):
6705     """Returns the tag list.
6706
6707     """
6708     cfg = self.cfg
6709     tgts = [("/cluster", cfg.GetClusterInfo())]
6710     ilist = cfg.GetAllInstancesInfo().values()
6711     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
6712     nlist = cfg.GetAllNodesInfo().values()
6713     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
6714     results = []
6715     for path, target in tgts:
6716       for tag in target.GetTags():
6717         if self.re.search(tag):
6718           results.append((path, tag))
6719     return results
6720
6721
6722 class LUAddTags(TagsLU):
6723   """Sets a tag on a given object.
6724
6725   """
6726   _OP_REQP = ["kind", "name", "tags"]
6727   REQ_BGL = False
6728
6729   def CheckPrereq(self):
6730     """Check prerequisites.
6731
6732     This checks the type and length of the tag name and value.
6733
6734     """
6735     TagsLU.CheckPrereq(self)
6736     for tag in self.op.tags:
6737       objects.TaggableObject.ValidateTag(tag)
6738
6739   def Exec(self, feedback_fn):
6740     """Sets the tag.
6741
6742     """
6743     try:
6744       for tag in self.op.tags:
6745         self.target.AddTag(tag)
6746     except errors.TagError, err:
6747       raise errors.OpExecError("Error while setting tag: %s" % str(err))
6748     try:
6749       self.cfg.Update(self.target)
6750     except errors.ConfigurationError:
6751       raise errors.OpRetryError("There has been a modification to the"
6752                                 " config file and the operation has been"
6753                                 " aborted. Please retry.")
6754
6755
6756 class LUDelTags(TagsLU):
6757   """Delete a list of tags from a given object.
6758
6759   """
6760   _OP_REQP = ["kind", "name", "tags"]
6761   REQ_BGL = False
6762
6763   def CheckPrereq(self):
6764     """Check prerequisites.
6765
6766     This checks that we have the given tag.
6767
6768     """
6769     TagsLU.CheckPrereq(self)
6770     for tag in self.op.tags:
6771       objects.TaggableObject.ValidateTag(tag)
6772     del_tags = frozenset(self.op.tags)
6773     cur_tags = self.target.GetTags()
6774     if not del_tags <= cur_tags:
6775       diff_tags = del_tags - cur_tags
6776       diff_names = ["'%s'" % tag for tag in diff_tags]
6777       diff_names.sort()
6778       raise errors.OpPrereqError("Tag(s) %s not found" %
6779                                  (",".join(diff_names)))
6780
6781   def Exec(self, feedback_fn):
6782     """Remove the tag from the object.
6783
6784     """
6785     for tag in self.op.tags:
6786       self.target.RemoveTag(tag)
6787     try:
6788       self.cfg.Update(self.target)
6789     except errors.ConfigurationError:
6790       raise errors.OpRetryError("There has been a modification to the"
6791                                 " config file and the operation has been"
6792                                 " aborted. Please retry.")
6793
6794
6795 class LUTestDelay(NoHooksLU):
6796   """Sleep for a specified amount of time.
6797
6798   This LU sleeps on the master and/or nodes for a specified amount of
6799   time.
6800
6801   """
6802   _OP_REQP = ["duration", "on_master", "on_nodes"]
6803   REQ_BGL = False
6804
6805   def ExpandNames(self):
6806     """Expand names and set required locks.
6807
6808     This expands the node list, if any.
6809
6810     """
6811     self.needed_locks = {}
6812     if self.op.on_nodes:
6813       # _GetWantedNodes can be used here, but is not always appropriate to use
6814       # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
6815       # more information.
6816       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
6817       self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
6818
6819   def CheckPrereq(self):
6820     """Check prerequisites.
6821
6822     """
6823
6824   def Exec(self, feedback_fn):
6825     """Do the actual sleep.
6826
6827     """
6828     if self.op.on_master:
6829       if not utils.TestDelay(self.op.duration):
6830         raise errors.OpExecError("Error during master delay test")
6831     if self.op.on_nodes:
6832       result = self.rpc.call_test_delay(self.op.on_nodes, self.op.duration)
6833       if not result:
6834         raise errors.OpExecError("Complete failure from rpc call")
6835       for node, node_result in result.items():
6836         node_result.Raise()
6837         if not node_result.data:
6838           raise errors.OpExecError("Failure during rpc call to node %s,"
6839                                    " result: %s" % (node, node_result.data))
6840
6841
6842 class IAllocator(object):
6843   """IAllocator framework.
6844
6845   An IAllocator instance has three sets of attributes:
6846     - cfg that is needed to query the cluster
6847     - input data (all members of the _KEYS class attribute are required)
6848     - four buffer attributes (in|out_data|text), that represent the
6849       input (to the external script) in text and data structure format,
6850       and the output from it, again in two formats
6851     - the result variables from the script (success, info, nodes) for
6852       easy usage
6853
6854   """
6855   _ALLO_KEYS = [
6856     "mem_size", "disks", "disk_template",
6857     "os", "tags", "nics", "vcpus", "hypervisor",
6858     ]
6859   _RELO_KEYS = [
6860     "relocate_from",
6861     ]
6862
6863   def __init__(self, lu, mode, name, **kwargs):
6864     self.lu = lu
6865     # init buffer variables
6866     self.in_text = self.out_text = self.in_data = self.out_data = None
6867     # init all input fields so that pylint is happy
6868     self.mode = mode
6869     self.name = name
6870     self.mem_size = self.disks = self.disk_template = None
6871     self.os = self.tags = self.nics = self.vcpus = None
6872     self.hypervisor = None
6873     self.relocate_from = None
6874     # computed fields
6875     self.required_nodes = None
6876     # init result fields
6877     self.success = self.info = self.nodes = None
6878     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6879       keyset = self._ALLO_KEYS
6880     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6881       keyset = self._RELO_KEYS
6882     else:
6883       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
6884                                    " IAllocator" % self.mode)
6885     for key in kwargs:
6886       if key not in keyset:
6887         raise errors.ProgrammerError("Invalid input parameter '%s' to"
6888                                      " IAllocator" % key)
6889       setattr(self, key, kwargs[key])
6890     for key in keyset:
6891       if key not in kwargs:
6892         raise errors.ProgrammerError("Missing input parameter '%s' to"
6893                                      " IAllocator" % key)
6894     self._BuildInputData()
6895
6896   def _ComputeClusterData(self):
6897     """Compute the generic allocator input data.
6898
6899     This is the data that is independent of the actual operation.
6900
6901     """
6902     cfg = self.lu.cfg
6903     cluster_info = cfg.GetClusterInfo()
6904     # cluster data
6905     data = {
6906       "version": constants.IALLOCATOR_VERSION,
6907       "cluster_name": cfg.GetClusterName(),
6908       "cluster_tags": list(cluster_info.GetTags()),
6909       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
6910       # we don't have job IDs
6911       }
6912     iinfo = cfg.GetAllInstancesInfo().values()
6913     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
6914
6915     # node data
6916     node_results = {}
6917     node_list = cfg.GetNodeList()
6918
6919     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
6920       hypervisor_name = self.hypervisor
6921     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
6922       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
6923
6924     node_data = self.lu.rpc.call_node_info(node_list, cfg.GetVGName(),
6925                                            hypervisor_name)
6926     node_iinfo = self.lu.rpc.call_all_instances_info(node_list,
6927                        cluster_info.enabled_hypervisors)
6928     for nname, nresult in node_data.items():
6929       # first fill in static (config-based) values
6930       ninfo = cfg.GetNodeInfo(nname)
6931       pnr = {
6932         "tags": list(ninfo.GetTags()),
6933         "primary_ip": ninfo.primary_ip,
6934         "secondary_ip": ninfo.secondary_ip,
6935         "offline": ninfo.offline,
6936         "drained": ninfo.drained,
6937         "master_candidate": ninfo.master_candidate,
6938         }
6939
6940       if not ninfo.offline:
6941         msg = nresult.RemoteFailMsg()
6942         if msg:
6943           raise errors.OpExecError("Can't get data for node %s: %s" %
6944                                    (nname, msg))
6945         msg = node_iinfo[nname].RemoteFailMsg()
6946         if msg:
6947           raise errors.OpExecError("Can't get node instance info"
6948                                    " from node %s: %s" % (nname, msg))
6949         remote_info = nresult.payload
6950         for attr in ['memory_total', 'memory_free', 'memory_dom0',
6951                      'vg_size', 'vg_free', 'cpu_total']:
6952           if attr not in remote_info:
6953             raise errors.OpExecError("Node '%s' didn't return attribute"
6954                                      " '%s'" % (nname, attr))
6955           if not isinstance(remote_info[attr], int):
6956             raise errors.OpExecError("Node '%s' returned invalid value"
6957                                      " for '%s': %s" %
6958                                      (nname, attr, remote_info[attr]))
6959         # compute memory used by primary instances
6960         i_p_mem = i_p_up_mem = 0
6961         for iinfo, beinfo in i_list:
6962           if iinfo.primary_node == nname:
6963             i_p_mem += beinfo[constants.BE_MEMORY]
6964             if iinfo.name not in node_iinfo[nname].payload:
6965               i_used_mem = 0
6966             else:
6967               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]['memory'])
6968             i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem
6969             remote_info['memory_free'] -= max(0, i_mem_diff)
6970
6971             if iinfo.admin_up:
6972               i_p_up_mem += beinfo[constants.BE_MEMORY]
6973
6974         # compute memory used by instances
6975         pnr_dyn = {
6976           "total_memory": remote_info['memory_total'],
6977           "reserved_memory": remote_info['memory_dom0'],
6978           "free_memory": remote_info['memory_free'],
6979           "total_disk": remote_info['vg_size'],
6980           "free_disk": remote_info['vg_free'],
6981           "total_cpus": remote_info['cpu_total'],
6982           "i_pri_memory": i_p_mem,
6983           "i_pri_up_memory": i_p_up_mem,
6984           }
6985         pnr.update(pnr_dyn)
6986
6987       node_results[nname] = pnr
6988     data["nodes"] = node_results
6989
6990     # instance data
6991     instance_data = {}
6992     for iinfo, beinfo in i_list:
6993       nic_data = []
6994       for nic in iinfo.nics:
6995         filled_params = objects.FillDict(
6996             cluster_info.nicparams[constants.PP_DEFAULT],
6997             nic.nicparams)
6998         nic_dict = {"mac": nic.mac,
6999                     "ip": nic.ip,
7000                     "mode": filled_params[constants.NIC_MODE],
7001                     "link": filled_params[constants.NIC_LINK],
7002                    }
7003         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
7004           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
7005         nic_data.append(nic_dict)
7006       pir = {
7007         "tags": list(iinfo.GetTags()),
7008         "admin_up": iinfo.admin_up,
7009         "vcpus": beinfo[constants.BE_VCPUS],
7010         "memory": beinfo[constants.BE_MEMORY],
7011         "os": iinfo.os,
7012         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
7013         "nics": nic_data,
7014         "disks": [{"size": dsk.size, "mode": dsk.mode} for dsk in iinfo.disks],
7015         "disk_template": iinfo.disk_template,
7016         "hypervisor": iinfo.hypervisor,
7017         }
7018       pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
7019                                                  pir["disks"])
7020       instance_data[iinfo.name] = pir
7021
7022     data["instances"] = instance_data
7023
7024     self.in_data = data
7025
7026   def _AddNewInstance(self):
7027     """Add new instance data to allocator structure.
7028
7029     This in combination with _AllocatorGetClusterData will create the
7030     correct structure needed as input for the allocator.
7031
7032     The checks for the completeness of the opcode must have already been
7033     done.
7034
7035     """
7036     data = self.in_data
7037
7038     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
7039
7040     if self.disk_template in constants.DTS_NET_MIRROR:
7041       self.required_nodes = 2
7042     else:
7043       self.required_nodes = 1
7044     request = {
7045       "type": "allocate",
7046       "name": self.name,
7047       "disk_template": self.disk_template,
7048       "tags": self.tags,
7049       "os": self.os,
7050       "vcpus": self.vcpus,
7051       "memory": self.mem_size,
7052       "disks": self.disks,
7053       "disk_space_total": disk_space,
7054       "nics": self.nics,
7055       "required_nodes": self.required_nodes,
7056       }
7057     data["request"] = request
7058
7059   def _AddRelocateInstance(self):
7060     """Add relocate instance data to allocator structure.
7061
7062     This in combination with _IAllocatorGetClusterData will create the
7063     correct structure needed as input for the allocator.
7064
7065     The checks for the completeness of the opcode must have already been
7066     done.
7067
7068     """
7069     instance = self.lu.cfg.GetInstanceInfo(self.name)
7070     if instance is None:
7071       raise errors.ProgrammerError("Unknown instance '%s' passed to"
7072                                    " IAllocator" % self.name)
7073
7074     if instance.disk_template not in constants.DTS_NET_MIRROR:
7075       raise errors.OpPrereqError("Can't relocate non-mirrored instances")
7076
7077     if len(instance.secondary_nodes) != 1:
7078       raise errors.OpPrereqError("Instance has not exactly one secondary node")
7079
7080     self.required_nodes = 1
7081     disk_sizes = [{'size': disk.size} for disk in instance.disks]
7082     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
7083
7084     request = {
7085       "type": "relocate",
7086       "name": self.name,
7087       "disk_space_total": disk_space,
7088       "required_nodes": self.required_nodes,
7089       "relocate_from": self.relocate_from,
7090       }
7091     self.in_data["request"] = request
7092
7093   def _BuildInputData(self):
7094     """Build input data structures.
7095
7096     """
7097     self._ComputeClusterData()
7098
7099     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
7100       self._AddNewInstance()
7101     else:
7102       self._AddRelocateInstance()
7103
7104     self.in_text = serializer.Dump(self.in_data)
7105
7106   def Run(self, name, validate=True, call_fn=None):
7107     """Run an instance allocator and return the results.
7108
7109     """
7110     if call_fn is None:
7111       call_fn = self.lu.rpc.call_iallocator_runner
7112     data = self.in_text
7113
7114     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
7115     result.Raise()
7116
7117     if not isinstance(result.data, (list, tuple)) or len(result.data) != 4:
7118       raise errors.OpExecError("Invalid result from master iallocator runner")
7119
7120     rcode, stdout, stderr, fail = result.data
7121
7122     if rcode == constants.IARUN_NOTFOUND:
7123       raise errors.OpExecError("Can't find allocator '%s'" % name)
7124     elif rcode == constants.IARUN_FAILURE:
7125       raise errors.OpExecError("Instance allocator call failed: %s,"
7126                                " output: %s" % (fail, stdout+stderr))
7127     self.out_text = stdout
7128     if validate:
7129       self._ValidateResult()
7130
7131   def _ValidateResult(self):
7132     """Process the allocator results.
7133
7134     This will process and if successful save the result in
7135     self.out_data and the other parameters.
7136
7137     """
7138     try:
7139       rdict = serializer.Load(self.out_text)
7140     except Exception, err:
7141       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
7142
7143     if not isinstance(rdict, dict):
7144       raise errors.OpExecError("Can't parse iallocator results: not a dict")
7145
7146     for key in "success", "info", "nodes":
7147       if key not in rdict:
7148         raise errors.OpExecError("Can't parse iallocator results:"
7149                                  " missing key '%s'" % key)
7150       setattr(self, key, rdict[key])
7151
7152     if not isinstance(rdict["nodes"], list):
7153       raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
7154                                " is not a list")
7155     self.out_data = rdict
7156
7157
7158 class LUTestAllocator(NoHooksLU):
7159   """Run allocator tests.
7160
7161   This LU runs the allocator tests
7162
7163   """
7164   _OP_REQP = ["direction", "mode", "name"]
7165
7166   def CheckPrereq(self):
7167     """Check prerequisites.
7168
7169     This checks the opcode parameters depending on the director and mode test.
7170
7171     """
7172     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7173       for attr in ["name", "mem_size", "disks", "disk_template",
7174                    "os", "tags", "nics", "vcpus"]:
7175         if not hasattr(self.op, attr):
7176           raise errors.OpPrereqError("Missing attribute '%s' on opcode input" %
7177                                      attr)
7178       iname = self.cfg.ExpandInstanceName(self.op.name)
7179       if iname is not None:
7180         raise errors.OpPrereqError("Instance '%s' already in the cluster" %
7181                                    iname)
7182       if not isinstance(self.op.nics, list):
7183         raise errors.OpPrereqError("Invalid parameter 'nics'")
7184       for row in self.op.nics:
7185         if (not isinstance(row, dict) or
7186             "mac" not in row or
7187             "ip" not in row or
7188             "bridge" not in row):
7189           raise errors.OpPrereqError("Invalid contents of the"
7190                                      " 'nics' parameter")
7191       if not isinstance(self.op.disks, list):
7192         raise errors.OpPrereqError("Invalid parameter 'disks'")
7193       for row in self.op.disks:
7194         if (not isinstance(row, dict) or
7195             "size" not in row or
7196             not isinstance(row["size"], int) or
7197             "mode" not in row or
7198             row["mode"] not in ['r', 'w']):
7199           raise errors.OpPrereqError("Invalid contents of the"
7200                                      " 'disks' parameter")
7201       if not hasattr(self.op, "hypervisor") or self.op.hypervisor is None:
7202         self.op.hypervisor = self.cfg.GetHypervisorType()
7203     elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
7204       if not hasattr(self.op, "name"):
7205         raise errors.OpPrereqError("Missing attribute 'name' on opcode input")
7206       fname = self.cfg.ExpandInstanceName(self.op.name)
7207       if fname is None:
7208         raise errors.OpPrereqError("Instance '%s' not found for relocation" %
7209                                    self.op.name)
7210       self.op.name = fname
7211       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
7212     else:
7213       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
7214                                  self.op.mode)
7215
7216     if self.op.direction == constants.IALLOCATOR_DIR_OUT:
7217       if not hasattr(self.op, "allocator") or self.op.allocator is None:
7218         raise errors.OpPrereqError("Missing allocator name")
7219     elif self.op.direction != constants.IALLOCATOR_DIR_IN:
7220       raise errors.OpPrereqError("Wrong allocator test '%s'" %
7221                                  self.op.direction)
7222
7223   def Exec(self, feedback_fn):
7224     """Run the allocator test.
7225
7226     """
7227     if self.op.mode == constants.IALLOCATOR_MODE_ALLOC:
7228       ial = IAllocator(self,
7229                        mode=self.op.mode,
7230                        name=self.op.name,
7231                        mem_size=self.op.mem_size,
7232                        disks=self.op.disks,
7233                        disk_template=self.op.disk_template,
7234                        os=self.op.os,
7235                        tags=self.op.tags,
7236                        nics=self.op.nics,
7237                        vcpus=self.op.vcpus,
7238                        hypervisor=self.op.hypervisor,
7239                        )
7240     else:
7241       ial = IAllocator(self,
7242                        mode=self.op.mode,
7243                        name=self.op.name,
7244                        relocate_from=list(self.relocate_from),
7245                        )
7246
7247     if self.op.direction == constants.IALLOCATOR_DIR_IN:
7248       result = ial.in_text
7249     else:
7250       ial.Run(self.op.allocator, validate=False)
7251       result = ial.out_text
7252     return result